Backport of #11267.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-01-12 11:02:53 +01:00
parent b86957c76e
commit b953871c9a
4 changed files with 99 additions and 30 deletions

View File

@ -14,7 +14,11 @@
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -39,7 +43,10 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -53,7 +60,9 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -749,6 +758,53 @@ public class IdleTimeoutTest extends AbstractTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testIdleTimeoutWhenCongested() throws Exception
{
long idleTimeout = 1000;
HTTP2CServerConnectionFactory h2c = new HTTP2CServerConnectionFactory(new HttpConfiguration());
prepareServer(h2c);
server.removeConnector(connector);
connector = new ServerConnector(server, 1, 1, h2c)
{
@Override
protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
{
SocketChannelEndPoint endpoint = new SocketChannelEndPoint(channel, selectSet, key, getScheduler())
{
@Override
public boolean flush(ByteBuffer... buffers)
{
// Fake TCP congestion.
return false;
}
@Override
protected void onIncompleteFlush()
{
// Do nothing here to avoid spin loop,
// since the network is actually writable,
// as we are only faking TCP congestion.
}
};
endpoint.setIdleTimeout(getIdleTimeout());
return endpoint;
}
};
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
server.start();
prepareClient();
client.start();
InetSocketAddress address = new InetSocketAddress("localhost", connector.getLocalPort());
// The connect() will complete exceptionally.
client.connect(address, new Session.Listener.Adapter());
await().atMost(Duration.ofMillis(5 * idleTimeout)).until(() -> connector.getConnectedEndPoints().size(), is(0));
}
private void sleep(long value)
{
try

View File

@ -1909,6 +1909,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
String reason = "idle_timeout";
boolean notify = false;
boolean terminate = false;
boolean sendGoAway = false;
GoAwayFrame goAwayFrame = null;
Throwable cause = null;
@ -1952,11 +1953,22 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
if (LOG.isDebugEnabled())
LOG.debug("Already closed, ignored idle timeout for {}", HTTP2Session.this);
return false;
// Writes may be TCP congested, so termination never happened.
terminate = true;
goAwayFrame = goAwaySent;
if (goAwayFrame == null)
goAwayFrame = goAwayRecv;
break;
}
}
}
if (terminate)
{
terminate(goAwayFrame);
return false;
}
if (notify)
{
boolean confirmed = notifyIdleTimeout(HTTP2Session.this);

View File

@ -216,8 +216,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
String reason = "go_away";
failStreams(stream -> true, error, reason, true);
terminate();
outwardDisconnect(error, reason);
terminateAndDisconnect(error, reason);
}
return CompletableFuture.completedFuture(null);
}
@ -494,18 +493,12 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
goAwaySent = newGoAwayFrame(false);
GoAwayFrame goAwayFrame = goAwaySent;
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() ->
{
terminate();
outwardDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
}));
terminateAndDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away")
));
}
else
{
zeroStreamsAction = () ->
{
terminate();
outwardDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
};
zeroStreamsAction = () -> terminateAndDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
failStreams = true;
}
}
@ -566,6 +559,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public boolean onIdleTimeout()
{
boolean notify = false;
boolean terminate = false;
try (AutoLock ignored = lock.lock())
{
switch (closeState)
@ -583,9 +577,8 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
case CLOSING:
case CLOSED:
{
if (LOG.isDebugEnabled())
LOG.debug("already closed, ignored idle timeout for {}", this);
return false;
terminate = true;
break;
}
default:
{
@ -594,6 +587,14 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
}
}
if (terminate)
{
if (LOG.isDebugEnabled())
LOG.debug("already closed, ignored idle timeout for {}", this);
terminateAndDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "idle_timeout");
return false;
}
boolean confirmed = true;
if (notify)
confirmed = notifyIdleTimeout();
@ -650,18 +651,15 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
failStreams(stream -> true, error, reason, true);
if (goAwayFrame != null)
{
writeControlFrame(goAwayFrame, Callback.from(() ->
{
terminate();
outwardDisconnect(error, reason);
}));
}
writeControlFrame(goAwayFrame, Callback.from(() -> terminateAndDisconnect(error, reason)));
else
{
terminate();
outwardDisconnect(error, reason);
}
terminateAndDisconnect(error, reason);
}
private void terminateAndDisconnect(long error, String reason)
{
terminate();
outwardDisconnect(error, reason);
}
/**

View File

@ -398,11 +398,14 @@ public abstract class QuicSession extends ContainerLifeCycle
public void outwardClose(long error, String reason)
{
boolean closed = quicheConnection.close(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
quicheConnection.close(error, reason);
// Flushing will eventually forward the outward close to the connection.
flush();
LOG.debug("outward closing ({}) 0x{}/{} on {}", closed, Long.toHexString(error), reason, this);
if (closed)
{
// Flushing will eventually forward the outward close to the connection.
flush();
}
}
private void finishOutwardClose(Throwable failure)