* Fixes #11679 - Jetty 12.0.8 seems to leak connection when it encounters earlyEOF. * Changed HttpConnection.RequestHandler.earlyEOF() to produce EofException instead of BadMessageException, as it is more appropriate. * Changed handling of HttpChannelState.onFailure() to not fail the write side unless there is a pending write callback. * Early EOF events now produce a EofException that is also an HttpException. * Now failures only impact pending writes, so that it would be possible to write an HTTP error response. --------- Signed-off-by: Simone Bordet <simone.bordet@gmail.com> Co-authored-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
parent
8e456c4ee5
commit
724273be09
|
@ -363,7 +363,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
|
||||
// If a write call is pending, take the writeCallback to fail below.
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(t, false);
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(t);
|
||||
|
||||
// If there was a pending IO operation, deliver the idle timeout via them.
|
||||
if (invokeOnContentAvailable != null || invokeWriteFailure != null)
|
||||
|
@ -432,7 +432,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_onContentAvailable = null;
|
||||
|
||||
// If a write call is in progress, take the writeCallback to fail below.
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(x, true);
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(x);
|
||||
|
||||
// Notify the failure listeners only once.
|
||||
Consumer<Throwable> onFailure = _onFailure;
|
||||
|
@ -1120,20 +1120,17 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
return _writeCallback != null;
|
||||
}
|
||||
|
||||
private Runnable lockedFailWrite(Throwable x, boolean fatal)
|
||||
private Runnable lockedFailWrite(Throwable x)
|
||||
{
|
||||
assert _request._lock.isHeldByCurrentThread();
|
||||
Callback writeCallback = _writeCallback;
|
||||
_writeCallback = null;
|
||||
if (writeCallback != null || fatal)
|
||||
{
|
||||
if (_writeFailure == null)
|
||||
_writeFailure = x;
|
||||
else
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
|
||||
}
|
||||
if (writeCallback == null)
|
||||
return null;
|
||||
if (_writeFailure == null)
|
||||
_writeFailure = x;
|
||||
else
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
|
||||
return () -> HttpChannelState.failed(writeCallback, x);
|
||||
}
|
||||
|
||||
|
|
|
@ -1057,7 +1057,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
|
|||
HttpStreamOverHTTP1 stream = _stream.get();
|
||||
if (stream != null)
|
||||
{
|
||||
BadMessageException bad = new BadMessageException("Early EOF");
|
||||
HttpEofException bad = new HttpEofException();
|
||||
Content.Chunk chunk = stream._chunk;
|
||||
|
||||
if (Content.Chunk.isFailure(chunk))
|
||||
|
@ -1640,4 +1640,28 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
|
|||
return HttpConnection.this.getEndPoint();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HttpParser converts some bad message event into early EOF.
|
||||
* However, we want to send a 400 (not a 500) to the client because it's a client error.
|
||||
*/
|
||||
private static class HttpEofException extends EofException implements HttpException
|
||||
{
|
||||
private HttpEofException()
|
||||
{
|
||||
super("Early EOF");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCode()
|
||||
{
|
||||
return HttpStatus.BAD_REQUEST_400;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getReason()
|
||||
{
|
||||
return getMessage();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.eclipse.jetty.server.internal.HttpChannelState;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.SerializedInvoker;
|
||||
|
@ -1229,10 +1228,11 @@ public class HttpChannelTest
|
|||
rq.demand(demand::countDown);
|
||||
assertThat(demand.getCount(), is(1L));
|
||||
|
||||
FuturePromise<Throwable> callback = new FuturePromise<>();
|
||||
// Write callback not serialized until after the onFailure task runs.
|
||||
handling.get().write(false, null, Callback.from(() -> {}, callback::succeeded));
|
||||
Callback.Completable callback = new Callback.Completable();
|
||||
// Writes are possible, unless a pending write is failed.
|
||||
handling.get().write(false, null, callback);
|
||||
assertTrue(callback.isDone());
|
||||
assertFalse(callback.isCompletedExceptionally());
|
||||
|
||||
// Run the onFailure task.
|
||||
try (StacklessLogging ignore = new StacklessLogging(Response.class))
|
||||
|
@ -1244,8 +1244,6 @@ public class HttpChannelTest
|
|||
assertThat(error.get(), sameInstance(failure));
|
||||
// Demand callback was called.
|
||||
assertTrue(demand.await(5, TimeUnit.SECONDS));
|
||||
// Write callback was failed.
|
||||
assertThat(callback.get(5, TimeUnit.SECONDS), sameInstance(failure));
|
||||
|
||||
// Request handling was completed.
|
||||
assertTrue(stream.isComplete());
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.ee10.servlet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import jakarta.servlet.AsyncContext;
|
||||
import jakarta.servlet.ServletException;
|
||||
import jakarta.servlet.ServletInputStream;
|
||||
import jakarta.servlet.ServletOutputStream;
|
||||
import jakarta.servlet.http.HttpServlet;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpTester;
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class EarlyEOFTest
|
||||
{
|
||||
private Server server;
|
||||
private ServerConnector connector;
|
||||
|
||||
private void start(HttpServlet servlet) throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
connector = new ServerConnector(server);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler("/ctx");
|
||||
context.addServlet(servlet, "/path/*");
|
||||
server.setHandler(context);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void dispose() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEarlyEOF() throws Exception
|
||||
{
|
||||
start(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
|
||||
ServletInputStream input = request.getInputStream();
|
||||
assertEquals('0', input.read());
|
||||
|
||||
// Early EOF.
|
||||
assertThrows(EofException.class, input::read);
|
||||
|
||||
// Must be able to send a response.
|
||||
response.setStatus(HttpStatus.ACCEPTED_202);
|
||||
ServletOutputStream output = response.getOutputStream();
|
||||
output.print("out");
|
||||
output.close();
|
||||
|
||||
asyncContext.complete();
|
||||
}
|
||||
});
|
||||
|
||||
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
|
||||
{
|
||||
String request = """
|
||||
POST /ctx/path/early HTTP/1.1\r
|
||||
Host: localhost\r
|
||||
Content-Length: 10\r
|
||||
|
||||
0""";
|
||||
channel.write(UTF_8.encode(request));
|
||||
// Close output before sending the whole content.
|
||||
channel.shutdownOutput();
|
||||
|
||||
HttpTester.Response response = HttpTester.parseResponse(channel);
|
||||
|
||||
assertThat(response.getStatus(), is(HttpStatus.ACCEPTED_202));
|
||||
assertTrue(response.contains(HttpHeader.CONNECTION, "close"));
|
||||
assertEquals("out", response.getContent());
|
||||
|
||||
// Connection must be closed by server.
|
||||
assertEquals(-1, channel.read(ByteBuffer.allocate(512)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue