Improve shutdown of non-persistent HTTP/1 connections #12212 (#12216)

* Improve shutdown of non-persistent HTTP/1 connections

 + shutdown in SendCallback

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

---------

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2024-09-03 07:58:16 +10:00 committed by GitHub
parent 5439f17ff6
commit 551710e9bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 90 additions and 23 deletions

View File

@ -318,16 +318,17 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
@Override
public ByteBuffer onUpgradeFrom()
{
if (!isRequestBufferEmpty())
if (isRequestBufferEmpty())
{
releaseRequestBuffer();
return null;
}
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
}
return null;
}
@Override
public void onUpgradeTo(ByteBuffer buffer)
@ -341,10 +342,10 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
if (_retainableByteBuffer.release())
RetainableByteBuffer buffer = _retainableByteBuffer;
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
if (!buffer.release())
throw new IllegalStateException("unreleased buffer " + buffer);
}
}
@ -369,7 +370,9 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
HttpConnection last = setCurrentConnection(this);
try
{
while (getEndPoint().isOpen())
// We must loop until we fill -1 or there is an async pause in handling.
// Note that the endpoint might already be closed in some special circumstances.
while (true)
{
// Fill the request buffer (if needed).
int filled = fillRequestBuffer();
@ -906,6 +909,13 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
@Override
protected void onCompleteSuccess()
{
// If we are a non-persistent connection and have succeeded the last write...
if (_shutdownOut && !(_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection))
{
// then we shutdown the output here so that the client sees the body termination ASAP and
// cannot be delayed by any further server handling before the stream callback is completed.
getEndPoint().shutdownOutput();
}
release().succeeded();
}
@ -1513,8 +1523,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
return;
}
Connection upgradeConnection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE);
if (upgradeConnection != null)
if (_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection upgradeConnection)
{
getEndPoint().upgrade(upgradeConnection);
_httpChannel.recycle();
@ -1523,13 +1532,8 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
return;
}
// As this is not an upgrade, we can shutdown the output if we know we are not persistent
if (_sendCallback._shutdownOut)
getEndPoint().shutdownOutput();
_httpChannel.recycle();
// If a 100 Continue is still expected to be sent, but no content was read, then
// close the parser so that seeks EOF below, not the next request.
if (_expects100Continue)

View File

@ -52,11 +52,15 @@ import org.eclipse.jetty.server.internal.HttpConnection;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,6 +72,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -1085,6 +1090,56 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
}
@Test
public void testCloseWhileCompletePending() throws Exception
{
String content = "The End!\r\n";
CountDownLatch handleComplete = new CountDownLatch(1);
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
FutureCallback writeComplete = new FutureCallback();
Content.Sink.write(response, true, content, writeComplete);
// Wait until the write is complete
writeComplete.get(30, TimeUnit.SECONDS);
// Wait until test lets the handling complete
assertTrue(handleComplete.await(30, TimeUnit.SECONDS));
callback.succeeded();
return true;
}
});
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream output = client.getOutputStream();
output.write("""
GET / HTTP/1.1\r
Host: localhost:%d\r
Connection: close\r
\r
""".formatted(_serverURI.getPort())
.getBytes());
output.flush();
client.setSoTimeout(5000);
long start = NanoTime.now();
HttpTester.Input input = HttpTester.from(client.getInputStream());
HttpTester.Response response = HttpTester.parseResponse(input);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(content, response.getContent());
assertFalse(input.isEOF());
assertEquals(-1, input.fillBuffer());
assertTrue(input.isEOF());
assertThat(NanoTime.secondsSince(start), lessThan(5L));
}
handleComplete.countDown();
}
@Test
public void testBigBlocks() throws Exception
{
@ -1813,8 +1868,9 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
}
@Test
public void testHoldContent() throws Exception
@ParameterizedTest
@ValueSource(booleans = {false /* TODO, true */})
public void testHoldContent(boolean close) throws Exception
{
Queue<Content.Chunk> contents = new ConcurrentLinkedQueue<>();
final int bufferSize = 1024;
@ -1857,6 +1913,10 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
response.setStatus(200);
if (close)
request.getConnectionMetaData().getConnection().getEndPoint().close();
callback.succeeded();
return true;
}
@ -1897,10 +1957,13 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
out.flush();
// check the response
if (!close)
{
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response);
assertThat(response.getStatus(), is(200));
}
}
assertTrue(closed.await(10, TimeUnit.SECONDS));