Fixes #11016 - Jetty 12 IllegalStateException when stopping Server wi… (#11017)

* Made ServletChannel error handling more robust.
A failure in error handling is now remembered so that the Handler callback can be failed later.
* Avoid failing the Handler callback from ServletChannel.abort(), as it is too early: should be failed when processing the TERMINATED state, similarly to when it is succeeded.
* Removed dead code from HttpConnection.SendCallback.reset(), since response is always non-null.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-12-15 15:09:20 +01:00 committed by GitHub
parent 3167e0cb6e
commit eb1e9eb8c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 210 additions and 63 deletions

View File

@ -1501,9 +1501,12 @@ public class HttpChannelState implements HttpChannel, Components
Throwable unconsumed = stream.consumeAvailable();
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);
ChannelResponse response = httpChannelState._response;
if (LOG.isDebugEnabled())
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", httpChannelState._stream.isCommitted(), httpChannelState._response.isCommitted(), this);
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);
// There may have been an attempt to write an error response that failed.
// Do not try to write again an error response if already committed.
if (!stream.isCommitted())
errorResponse = new ErrorResponse(request);
}

View File

@ -741,26 +741,18 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
_lastContent = last;
_callback = callback;
_header = null;
if (getConnector().isShutdown())
_generator.setPersistent(false);
return true;
}
if (isClosed() && response == null && last && content == null)
else
{
callback.succeeded();
if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}
LOG.warn("reset failed {}", this);
if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}
@Override

View File

@ -350,7 +350,7 @@ public class ErrorHandler implements Request.Handler
}
// Do an asynchronous completion.
baseRequest.getServletChannel().sendResponseAndComplete();
baseRequest.getServletChannel().sendErrorResponseAndComplete();
}
protected void handleErrorPage(HttpServletRequest request, Writer writer, int code, String message) throws IOException

View File

@ -470,7 +470,7 @@ public class ServletChannel
// If we can't have a body or have no ErrorHandler, then create a minimal error response.
if (HttpStatus.hasNoBody(getServletContextResponse().getStatus()) || errorHandler == null)
{
sendResponseAndComplete();
sendErrorResponseAndComplete();
}
else
{
@ -485,7 +485,7 @@ public class ServletChannel
// that ignores existing failures. However, the error handler needs to be able to call servlet pages,
// so it will need to do a new call to associate(req,res,callback) or similar, to make the servlet request and
// response wrap the error request and response. Have to think about what callback is passed.
errorHandler.handle(getServletContextRequest(), getServletContextResponse(), Callback.from(_state::errorHandlingComplete));
errorHandler.handle(getServletContextRequest(), getServletContextResponse(), Callback.from(() -> _state.errorHandlingComplete(null), _state::errorHandlingComplete));
}
}
catch (Throwable x)
@ -495,23 +495,16 @@ public class ServletChannel
else
ExceptionUtil.addSuppressedIfNotAssociated(cause, x);
if (LOG.isDebugEnabled())
LOG.debug("Could not perform ERROR dispatch, aborting", cause);
LOG.debug("Could not perform error handling, aborting", cause);
if (_state.isResponseCommitted())
{
abort(cause);
// Perform the same behavior as when the callback is failed.
_state.errorHandlingComplete(cause);
}
else
{
try
{
getServletContextResponse().resetContent();
sendResponseAndComplete();
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(cause, t);
abort(cause);
}
getServletContextResponse().resetContent();
sendErrorResponseAndComplete();
}
}
finally
@ -684,7 +677,7 @@ public class ServletChannel
return null;
}
public void sendResponseAndComplete()
public void sendErrorResponseAndComplete()
{
try
{
@ -694,6 +687,7 @@ public class ServletChannel
catch (Throwable x)
{
abort(x);
_state.completed(x);
}
}
@ -742,10 +736,13 @@ public class ServletChannel
_servletContextRequest.setAttribute(CustomRequestLog.LOG_DETAIL, logDetail);
}
// Callback will either be succeeded here or failed in abort().
// Callback is completed only here.
Callback callback = _callback;
if (_state.completeResponse())
Throwable failure = _state.completeResponse();
if (failure == null)
callback.succeeded();
else
callback.failed(failure);
}
public boolean isCommitted()
@ -783,13 +780,8 @@ public class ServletChannel
*/
public void abort(Throwable failure)
{
// Callback will either be failed here or succeeded in onCompleted().
if (_state.abortResponse())
{
if (LOG.isDebugEnabled())
LOG.debug("abort {}", this, failure);
_callback.failed(failure);
}
// Callback will be failed in onCompleted().
_state.abort(failure);
}
private void dispatch() throws Exception

View File

@ -151,6 +151,7 @@ public class ServletChannelState
private long _timeoutMs = DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
private Thread _onTimeoutThread;
private Throwable _failure;
private boolean _failureListener;
protected ServletChannelState(ServletChannel servletChannel)
@ -293,19 +294,19 @@ public class ServletChannelState
}
}
public boolean completeResponse()
public Throwable completeResponse()
{
try (AutoLock ignored = lock())
{
switch (_outputState)
{
case OPEN:
_outputState = OutputState.COMPLETED;
return true;
// This method is called when the state machine
// is about to terminate the processing, just
// before completing the Handler's callback.
assert _outputState == OutputState.OPEN || _failure != null;
default:
return false;
}
if (_outputState == OutputState.OPEN)
_outputState = OutputState.COMPLETED;
return _failure;
}
}
@ -322,7 +323,7 @@ public class ServletChannelState
}
}
public boolean abortResponse()
private boolean abortResponse(Throwable failure)
{
try (AutoLock ignored = lock())
{
@ -332,18 +333,34 @@ public class ServletChannelState
case ABORTED:
return false;
case OPEN:
_servletChannel.getServletContextResponse().setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
_outputState = OutputState.ABORTED;
return true;
default:
_outputState = OutputState.ABORTED;
_failure = failure;
return true;
}
}
}
public void abort(Throwable failure)
{
boolean handle = false;
try (AutoLock ignored = lock())
{
boolean aborted = abortResponse(failure);
if (LOG.isDebugEnabled())
LOG.debug("abort={} {}", aborted, this, failure);
if (aborted)
{
handle = _state == State.WAITING;
if (handle)
_state = State.WOKEN;
_requestState = RequestState.COMPLETED;
}
}
if (handle)
scheduleDispatch();
}
/**
* @return Next handling of the request should proceed
*/
@ -555,7 +572,12 @@ public class ServletChannelState
}
}
public void errorHandling()
/**
* Called when an asynchronous call to {@code ErrorHandler.handle()} is about to happen.
*
* @see #errorHandlingComplete(Throwable)
*/
void errorHandling()
{
try (AutoLock ignored = lock())
{
@ -565,17 +587,29 @@ public class ServletChannelState
}
}
public void errorHandlingComplete()
/**
* Called when the {@code Callback} passed to {@code ErrorHandler.handle()} is completed.
*
* @param failure the failure reported by the error handling,
* or {@code null} if there was no failure
*/
void errorHandlingComplete(Throwable failure)
{
boolean handle;
try (AutoLock ignored = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("errorHandlingComplete {}", toStringLocked());
LOG.debug("errorHandlingComplete {}", toStringLocked(), failure);
handle = _state == State.WAITING;
if (handle)
_state = State.WOKEN;
// If there is a failure while trying to
// handle a previous failure, just bail out.
if (failure != null)
abortResponse(failure);
if (_requestState == RequestState.ERRORING)
_requestState = RequestState.COMPLETE;
}

View File

@ -15,23 +15,30 @@ package org.eclipse.jetty.ee10.servlet;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncServletLongPollTest
@ -65,7 +72,7 @@ public class AsyncServletLongPollTest
@Test
public void testSuspendedRequestCompletedByAnotherRequest() throws Exception
{
final CountDownLatch asyncLatch = new CountDownLatch(1);
CountDownLatch asyncLatch = new CountDownLatch(1);
prepare(new HttpServlet()
{
private volatile AsyncContext asyncContext;
@ -93,7 +100,7 @@ public class AsyncServletLongPollTest
if (param != null)
error = Integer.parseInt(param);
final AsyncContext asyncContext = this.asyncContext;
AsyncContext asyncContext = this.asyncContext;
if (asyncContext != null)
{
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
@ -152,4 +159,56 @@ public class AsyncServletLongPollTest
assertEquals(200, response3.getStatus());
}
}
@Test
public void testSuspendedRequestThenServerStop() throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
prepare(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
{
// Suspend the request.
AsyncContext asyncContext = request.startAsync();
asyncContextRef.set(asyncContext);
}
@Override
public void destroy()
{
// Try to write an error response when shutting down.
AsyncContext asyncContext = asyncContextRef.get();
try
{
HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
}
catch (IOException x)
{
throw new RuntimeException(x);
}
finally
{
asyncContext.complete();
}
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
HttpTester.Request request = HttpTester.newRequest();
request.setURI(uri);
client.write(request.generate());
await().atMost(5, TimeUnit.SECONDS).until(asyncContextRef::get, Matchers.notNullValue());
server.stop();
client.socket().setSoTimeout(1000);
// The connection has been closed, no response.
HttpTester.Response response = HttpTester.parseResponse(client);
assertNull(response);
}
}
}

View File

@ -40,6 +40,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>

View File

@ -39,6 +39,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>

View File

@ -15,27 +15,32 @@ package org.eclipse.jetty.ee9.servlet;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled // TODO
public class AsyncServletLongPollTest
{
private Server server;
@ -66,7 +71,7 @@ public class AsyncServletLongPollTest
@Test
public void testSuspendedRequestCompletedByAnotherRequest() throws Exception
{
final CountDownLatch asyncLatch = new CountDownLatch(1);
CountDownLatch asyncLatch = new CountDownLatch(1);
prepare(new HttpServlet()
{
private volatile AsyncContext asyncContext;
@ -94,7 +99,7 @@ public class AsyncServletLongPollTest
if (param != null)
error = Integer.parseInt(param);
final AsyncContext asyncContext = this.asyncContext;
AsyncContext asyncContext = this.asyncContext;
if (asyncContext != null)
{
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
@ -153,4 +158,56 @@ public class AsyncServletLongPollTest
assertEquals(200, response3.getStatus());
}
}
@Test
public void testSuspendedRequestThenServerStop() throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
prepare(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
{
// Suspend the request.
AsyncContext asyncContext = request.startAsync();
asyncContextRef.set(asyncContext);
}
@Override
public void destroy()
{
// Try to write an error response when shutting down.
AsyncContext asyncContext = asyncContextRef.get();
try
{
HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
}
catch (IOException x)
{
throw new RuntimeException(x);
}
finally
{
asyncContext.complete();
}
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
HttpTester.Request request = HttpTester.newRequest();
request.setURI(uri);
client.write(request.generate());
await().atMost(5, TimeUnit.SECONDS).until(asyncContextRef::get, Matchers.notNullValue());
server.stop();
client.socket().setSoTimeout(1000);
// The connection has been closed, no response.
HttpTester.Response response = HttpTester.parseResponse(client);
assertNull(response);
}
}
}