Fix #5605 Unblock non container Threads

Ensure that HttpInput is always closed to EOF, EarlyEOF or Error, so that non container threads doing blocking reads
will not block forever, even if late.   Delay recycling of HttpInput until next request is received.
This commit is contained in:
gregw 2021-02-02 14:03:38 +01:00
parent 4bf250fbaa
commit 9cc7be4842
8 changed files with 591 additions and 38 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -362,6 +363,12 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_fillInterest.register(callback); _fillInterest.register(callback);
} }
@Override
public Throwable cancelFillInterest(Supplier<Throwable> cancellation)
{
return _fillInterest.cancel(cancellation);
}
@Override @Override
public boolean tryFillInterested(Callback callback) public boolean tryFillInterested(Callback callback)
{ {

View File

@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException; import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.util.function.Supplier;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
@ -223,6 +224,8 @@ public interface EndPoint extends Closeable
*/ */
boolean isFillInterested(); boolean isFillInterested();
Throwable cancelFillInterest(Supplier<Throwable> cancellation);
/** /**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either * <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p> * all the data has been flushed or an error occurs.</p>

View File

@ -21,7 +21,9 @@ package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadPendingException; import java.nio.channels.ReadPendingException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -42,6 +44,38 @@ public abstract class FillInterest
{ {
} }
/**
* Cancel a fill interest registration.
*
* If there was a registration, then any {@link #fillable()}, {@link #onClose()} or {@link #onFail(Throwable)}
* calls are remembered and passed to the next registration.
* Since any actions resulting from a call to {@link #needsFillInterest()} cannot be unwound, a subsequent call to
* register will not call {@link #needsFillInterest()} again if it has already been called an no callback received.
* @param cancellation A supplier of the cancellation Throwable to use if there is an existing registration. If the
* suppler or the supplied Throwable is null, then a new {@link CancellationException} is used.
* @return The Throwable used to cancel an existing registration or null if there was no registration to cancel.
*/
public Throwable cancel(Supplier<Throwable> cancellation)
{
Cancelled cancelled = new Cancelled();
while (true)
{
Callback callback = _interested.get();
if (callback == null || callback instanceof Cancelled)
return null;
if (_interested.compareAndSet(callback, cancelled))
{
Throwable cause = cancellation == null ? null : cancellation.get();
if (cause == null)
cause = new CancellationException();
if (LOG.isDebugEnabled())
LOG.debug("cancelled {} {}",this, callback, cause);
callback.failed(cause);
return cause;
}
}
}
/** /**
* Call to register interest in a callback when a read is possible. * Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()} * The callback will be called either immediately if {@link #needsFillInterest()}
@ -68,16 +102,40 @@ public abstract class FillInterest
* @return true if the register succeeded * @return true if the register succeeded
*/ */
public boolean tryRegister(Callback callback) public boolean tryRegister(Callback callback)
{
return register(callback, null);
}
/**
* Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()}
* returns true or eventually once {@link #fillable()} is called.
*
* @param callback the callback to register
* @param cancellation A supplier of a {@link Throwable}, which if not null will be used to fail any existing registration
* @return true if the register succeeded
*/
public boolean register(Callback callback, Supplier<Throwable> cancellation)
{ {
if (callback == null) if (callback == null)
throw new IllegalArgumentException(); throw new IllegalArgumentException();
if (!_interested.compareAndSet(null, callback)) while (true)
{
Callback existing = _interested.get();
if (existing != null && !(existing instanceof Cancelled) && cancellation == null)
return false; return false;
if (LOG.isDebugEnabled()) if (existing == callback)
LOG.debug("interested {}", this); return true;
if (_interested.compareAndSet(existing, callback))
{
if (LOG.isDebugEnabled())
LOG.debug("interested {}->{}", existing, this);
if (existing == null)
{
try try
{ {
needsFillInterest(); needsFillInterest();
@ -86,9 +144,22 @@ public abstract class FillInterest
{ {
onFail(e); onFail(e);
} }
}
else if (existing instanceof Cancelled)
{
((Cancelled)existing).apply(callback);
}
else
{
Throwable cause = cancellation.get();
if (cause == null)
cause = new CancellationException();
existing.failed(cause);
}
return true; return true;
} }
}
}
/** /**
* Call to signal that a read is now possible. * Call to signal that a read is now possible.
@ -97,17 +168,19 @@ public abstract class FillInterest
*/ */
public boolean fillable() public boolean fillable()
{ {
if (LOG.isDebugEnabled()) while (true)
LOG.debug("fillable {}", this);
Callback callback = _interested.get();
if (callback != null && _interested.compareAndSet(callback, null))
{ {
Callback callback = _interested.get();
if (callback == null)
return false;
if (_interested.compareAndSet(callback, null))
{
if (LOG.isDebugEnabled())
LOG.debug("fillable {} {}",this, callback);
callback.succeeded(); callback.succeeded();
return true; return true;
} }
if (LOG.isDebugEnabled()) }
LOG.debug("{} lost race {}", this, callback);
return false;
} }
/** /**
@ -115,7 +188,8 @@ public abstract class FillInterest
*/ */
public boolean isInterested() public boolean isInterested()
{ {
return _interested.get() != null; Callback callback = _interested.get();
return callback != null && !(callback instanceof Cancelled);
} }
public InvocationType getCallbackInvocationType() public InvocationType getCallbackInvocationType()
@ -132,24 +206,37 @@ public abstract class FillInterest
*/ */
public boolean onFail(Throwable cause) public boolean onFail(Throwable cause)
{ {
if (LOG.isDebugEnabled()) while (true)
LOG.debug("onFail " + this, cause);
Callback callback = _interested.get();
if (callback != null && _interested.compareAndSet(callback, null))
{ {
Callback callback = _interested.get();
if (callback == null)
return false;
if (_interested.compareAndSet(callback, null))
{
if (LOG.isDebugEnabled())
LOG.debug("onFail {} {}",this, callback, cause);
callback.failed(cause); callback.failed(cause);
return true; return true;
} }
return false; }
} }
public void onClose() public void onClose()
{ {
if (LOG.isDebugEnabled()) while (true)
LOG.debug("onClose {}", this); {
Callback callback = _interested.get(); Callback callback = _interested.get();
if (callback != null && _interested.compareAndSet(callback, null)) if (callback == null)
callback.failed(new ClosedChannelException()); return;
if (_interested.compareAndSet(callback, null))
{
ClosedChannelException cause = new ClosedChannelException();
if (LOG.isDebugEnabled())
LOG.debug("onFail {} {}",this, callback, cause);
callback.failed(cause);
return;
}
}
} }
@Override @Override
@ -171,4 +258,36 @@ public abstract class FillInterest
* @throws IOException if unable to fulfill interest in fill * @throws IOException if unable to fulfill interest in fill
*/ */
protected abstract void needsFillInterest() throws IOException; protected abstract void needsFillInterest() throws IOException;
private static class Cancelled implements Callback
{
private final AtomicReference<Object> _result = new AtomicReference<>();
@Override
public void succeeded()
{
_result.compareAndSet(null, Boolean.TRUE);
}
@Override
public void failed(Throwable x)
{
_result.compareAndSet(null, x == null ? new Exception() : x);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
void apply(Callback callback)
{
Object result = _result.get();
if (result == Boolean.TRUE)
callback.succeeded();
else if (result instanceof Throwable)
callback.failed((Throwable)result);
}
}
} }

View File

@ -376,6 +376,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override @Override
public void onCompleted() public void onCompleted()
{ {
boolean complete = _input.consumeAll();
getEndPoint().cancelFillInterest(_input::getError);
// Handle connection upgrades // Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{ {
@ -409,7 +412,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_parser.close(); _parser.close();
} }
// else abort if we can't consume all // else abort if we can't consume all
else if (_generator.isPersistent() && !_input.consumeAll()) else if (_generator.isPersistent() && !complete)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser); LOG.debug("unconsumed input {} {}", this, _parser);

View File

@ -720,12 +720,17 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
produceContent(); produceContent();
if (_content == null && _intercepted == null && _inputQ.isEmpty()) if (_content == null && _intercepted == null && _inputQ.isEmpty())
{
_state = EARLY_EOF;
_inputQ.notify();
return false; return false;
} }
}
catch (Throwable e) catch (Throwable e)
{ {
LOG.debug(e); LOG.debug(e);
_state = new ErrorState(e); _state = new ErrorState(e);
_inputQ.notify();
return false; return false;
} }
} }
@ -740,6 +745,15 @@ public class HttpInput extends ServletInputStream implements Runnable
} }
} }
public Throwable getError()
{
synchronized (_inputQ)
{
Throwable error = _state instanceof ErrorState ? ((ErrorState)_state)._error : null;
return error == null ? new IOException() : error;
}
}
public boolean isAsync() public boolean isAsync()
{ {
synchronized (_inputQ) synchronized (_inputQ)

View File

@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException; import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
@ -805,6 +806,12 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory
_endp.fillInterested(callback); _endp.fillInterested(callback);
} }
@Override
public Throwable cancelFillInterest(Supplier<Throwable> cancellation)
{
return _endp.cancelFillInterest(cancellation);
}
@Override @Override
public boolean flush(ByteBuffer... buffer) throws IOException public boolean flush(ByteBuffer... buffer) throws IOException
{ {

View File

@ -1811,6 +1811,8 @@ public class Request implements HttpServletRequest
*/ */
public void setMetaData(org.eclipse.jetty.http.MetaData.Request request) public void setMetaData(org.eclipse.jetty.http.MetaData.Request request)
{ {
if (_metaData == null)
_input.recycle();
_metaData = request; _metaData = request;
setMethod(request.getMethod()); setMethod(request.getMethod());
@ -1879,7 +1881,7 @@ public class Request implements HttpServletRequest
getHttpChannelState().recycle(); getHttpChannelState().recycle();
_requestAttributeListeners.clear(); _requestAttributeListeners.clear();
_input.recycle(); // Defer _input.recycle() until setMetaData on next request, so that late readers will fail
_metaData = null; _metaData = null;
_originalURI = null; _originalURI = null;
_contextPath = null; _contextPath = null;

View File

@ -0,0 +1,398 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BlockingTest
{
private Server server;
ServerConnector connector;
private ContextHandler context;
@BeforeEach
void setUp()
{
server = new Server();
connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
context = new ContextHandler("/ctx");
HandlerList handlers = new HandlerList();
handlers.setHandlers(new Handler[]{context, new DefaultHandler()});
server.setHandler(handlers);
}
@AfterEach
void tearDown() throws Exception
{
server.stop();
}
@Test
public void testBlockingReadThenNormalComplete() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("OK"));
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testNormalCompleteThenBlockingRead() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("OK"));
completed.countDown();
Thread.sleep(1000);
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testStartAsyncThenBlockingReadThenTimeout() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
AsyncContext async = request.startAsync();
async.setTimeout(100);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
}
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(500));
assertThat(response.getContent(), containsString("AsyncContext timeout"));
completed.countDown();
Thread.sleep(1000);
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testBlockingReadThenSendError() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.sendError(499);
}
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(499));
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
}