Fixes #11259 - HTTP/2 connection not closed after idle timeout when TCP congested. (#11267)

Now upon the second idle timeout, the connection is forcibly closed.
Fixed also similar problem in HTTP/3.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-01-15 17:29:05 +01:00 committed by GitHub
parent 090287db5e
commit 0839a208cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 385 additions and 66 deletions

View File

@ -1909,6 +1909,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
{
String reason = "idle_timeout";
boolean notify = false;
boolean terminate = false;
boolean sendGoAway = false;
GoAwayFrame goAwayFrame = null;
Throwable cause = null;
@ -1923,10 +1924,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
return false;
notify = true;
}
// Timed out while waiting for closing events, fail all the streams.
case LOCALLY_CLOSED ->
{
// Timed out while waiting for closing events, fail all the streams.
if (goAwaySent.isGraceful())
{
goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason);
@ -1935,7 +1935,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
goAwayFrame = goAwaySent;
closed = CloseState.CLOSING;
zeroStreamsAction = null;
failure = cause = new TimeoutException("Session idle timeout expired");
failure = cause = newTimeoutException();
}
case REMOTELY_CLOSED ->
{
@ -1944,17 +1944,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
goAwayFrame = goAwaySent;
closed = CloseState.CLOSING;
zeroStreamsAction = null;
failure = cause = new TimeoutException("Session idle timeout expired");
}
default ->
{
if (LOG.isDebugEnabled())
LOG.debug("Already closed, ignored idle timeout for {}", HTTP2Session.this);
return false;
failure = cause = newTimeoutException();
}
default -> terminate = true;
}
}
if (terminate)
{
if (LOG.isDebugEnabled())
LOG.debug("Already closed, ignored idle timeout for {}", HTTP2Session.this);
// Writes may be TCP congested, so termination never happened.
flusher.abort(newTimeoutException());
return false;
}
if (notify)
{
boolean confirmed = notifyIdleTimeout(HTTP2Session.this);
@ -1973,6 +1977,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
return false;
}
private TimeoutException newTimeoutException()
{
return new TimeoutException("Session idle timeout expired");
}
private void onSessionFailure(int error, String reason, Callback callback)
{
GoAwayFrame goAwayFrame;
@ -2036,7 +2045,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame)
{
sendGoAway(frame, Callback.from(Callback.NOOP, () -> terminate(eventFrame)));
sendGoAway(frame, Callback.from(() -> terminate(eventFrame)));
}
private void sendGoAway(GoAwayFrame frame, Callback callback)

View File

@ -13,7 +13,11 @@
package org.eclipse.jetty.http2.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -33,8 +37,11 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
@ -50,6 +57,7 @@ import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -747,10 +755,10 @@ public class IdleTimeoutTest extends AbstractTest
await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)client).updateSendWindow(0), Matchers.greaterThan(0));
// Wait for the server to finish serving requests.
await().atMost(5, TimeUnit.SECONDS).until(handled::get, Matchers.is(0));
assertThat(requests.get(), Matchers.is(count - 1));
await().atMost(5, TimeUnit.SECONDS).until(handled::get, is(0));
assertThat(requests.get(), is(count - 1));
await().atMost(5, TimeUnit.SECONDS).until(responses::get, Matchers.is(count - 1));
await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count - 1));
}
@Test
@ -837,6 +845,53 @@ public class IdleTimeoutTest extends AbstractTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testIdleTimeoutWhenCongested() throws Exception
{
long idleTimeout = 1000;
HTTP2CServerConnectionFactory h2c = new HTTP2CServerConnectionFactory(new HttpConfiguration());
prepareServer(h2c);
server.removeConnector(connector);
connector = new ServerConnector(server, 1, 1, h2c)
{
@Override
protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
{
SocketChannelEndPoint endpoint = new SocketChannelEndPoint(channel, selectSet, key, getScheduler())
{
@Override
public boolean flush(ByteBuffer... buffers)
{
// Fake TCP congestion.
return false;
}
@Override
protected void onIncompleteFlush()
{
// Do nothing here to avoid spin loop,
// since the network is actually writable,
// as we are only faking TCP congestion.
}
};
endpoint.setIdleTimeout(getIdleTimeout());
return endpoint;
}
};
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
server.start();
prepareClient();
httpClient.start();
InetSocketAddress address = new InetSocketAddress("localhost", connector.getLocalPort());
// The connect() will complete exceptionally.
http2Client.connect(address, new Session.Listener() {});
await().atMost(Duration.ofMillis(5 * idleTimeout)).until(() -> connector.getConnectedEndPoints().size(), is(0));
}
private void sleep(long value)
{
try

View File

@ -218,8 +218,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
String reason = "go_away";
failStreams(stream -> true, error, reason, true, new ClosedChannelException());
terminate();
outwardDisconnect(error, reason);
terminateAndDisconnect(error, reason);
}
return CompletableFuture.completedFuture(null);
}
@ -489,18 +488,12 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
goAwaySent = newGoAwayFrame(false);
GoAwayFrame goAwayFrame = goAwaySent;
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() ->
{
terminate();
outwardDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
}));
terminateAndDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away")
));
}
else
{
zeroStreamsAction = () ->
{
terminate();
outwardDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
};
zeroStreamsAction = () -> terminateAndDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
failStreams = true;
}
}
@ -561,34 +554,24 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public boolean onIdleTimeout()
{
boolean notify = false;
boolean terminate = false;
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
case NOT_CLOSED:
{
notify = true;
break;
}
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
break;
}
case CLOSING:
case CLOSED:
{
if (LOG.isDebugEnabled())
LOG.debug("already closed, ignored idle timeout for {}", this);
return false;
}
default:
{
throw new IllegalStateException();
}
case NOT_CLOSED -> notify = true;
case CLOSING, CLOSED -> terminate = true;
}
}
if (terminate)
{
if (LOG.isDebugEnabled())
LOG.debug("already closed, ignored idle timeout for {}", this);
terminateAndDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "idle_timeout");
return false;
}
boolean confirmed = true;
if (notify)
confirmed = notifyIdleTimeout();
@ -645,18 +628,15 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
failStreams(stream -> true, error, reason, true, new IOException(reason));
if (goAwayFrame != null)
{
writeControlFrame(goAwayFrame, Callback.from(() ->
{
terminate();
outwardDisconnect(error, reason);
}));
}
writeControlFrame(goAwayFrame, Callback.from(() -> terminateAndDisconnect(error, reason)));
else
{
terminate();
outwardDisconnect(error, reason);
}
terminateAndDisconnect(error, reason);
}
private void terminateAndDisconnect(long error, String reason)
{
terminate();
outwardDisconnect(error, reason);
}
/**

View File

@ -0,0 +1,148 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.tests;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.server.ServerQuicConnection;
import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(WorkDirExtension.class)
public class IdleTimeoutTest
{
private Server server;
private HTTP3Client http3Client;
@BeforeEach
public void prepare()
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(http3Client);
LifeCycle.stop(server);
}
@Test
public void testIdleTimeoutWhenCongested(WorkDir workDir) throws Exception
{
long idleTimeout = 1000;
AtomicBoolean established = new AtomicBoolean();
CountDownLatch disconnectLatch = new CountDownLatch(1);
RawHTTP3ServerConnectionFactory h3 = new RawHTTP3ServerConnectionFactory(new HttpConfiguration(), new Session.Server.Listener()
{
@Override
public void onAccept(Session session)
{
established.set(true);
}
@Override
public void onDisconnect(Session session, long error, String reason)
{
disconnectLatch.countDown();
}
});
CountDownLatch closeLatch = new CountDownLatch(1);
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
HTTP3ServerConnector connector = new HTTP3ServerConnector(server, sslContextFactory, h3)
{
@Override
protected ServerQuicConnection newConnection(EndPoint endpoint)
{
return new ServerQuicConnection(this, endpoint)
{
@Override
protected ServerQuicSession newQuicSession(SocketAddress remoteAddress, QuicheConnection quicheConnection)
{
return new ServerQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), quicheConnection, this, remoteAddress, getQuicServerConnector())
{
@Override
public int flush(long streamId, ByteBuffer buffer, boolean last) throws IOException
{
if (established.get())
return 0;
return super.flush(streamId, buffer, last);
}
@Override
public void outwardClose(long error, String reason)
{
closeLatch.countDown();
super.outwardClose(error, reason);
}
};
}
};
}
};
connector.getQuicConfiguration().setPemWorkDirectory(workDir.getEmptyPathDir());
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
server.start();
http3Client = new HTTP3Client();
http3Client.getClientConnector().setSslContextFactory(new SslContextFactory.Client(true));
http3Client.start();
Session.Client session = http3Client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
MetaData.Request request = new MetaData.Request("GET", HttpURI.from("http://localhost:" + connector.getLocalPort() + "/path"), HttpVersion.HTTP_3, HttpFields.EMPTY);
// The request will complete exceptionally.
session.newRequest(new HeadersFrame(request, true), new Stream.Client.Listener() {});
assertTrue(closeLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(disconnectLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}
}

View File

@ -398,11 +398,15 @@ public abstract class QuicSession extends ContainerLifeCycle
public void outwardClose(long error, String reason)
{
boolean closed = quicheConnection.close(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
quicheConnection.close(error, reason);
// Flushing will eventually forward the outward close to the connection.
flush();
LOG.debug("outward closing ({}) 0x{}/{} on {}", closed, Long.toHexString(error), reason, this);
if (closed)
{
// Flushing will eventually forward
// the outward close to the connection.
flush();
}
}
private void finishOutwardClose(Throwable failure)

View File

@ -305,6 +305,16 @@ public class QuicServerConnector extends AbstractNetworkConnector
throw new UnsupportedOperationException(getClass().getSimpleName() + " has no accept mechanism");
}
protected EndPoint newEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
return new DatagramChannelEndPoint(channel, selector, selectionKey, getScheduler());
}
protected ServerQuicConnection newConnection(EndPoint endpoint)
{
return new ServerQuicConnection(QuicServerConnector.this, endpoint);
}
private class ServerDatagramSelectorManager extends SelectorManager
{
protected ServerDatagramSelectorManager(Executor executor, Scheduler scheduler, int selectors)
@ -315,7 +325,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
EndPoint endPoint = new DatagramChannelEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler());
EndPoint endPoint = QuicServerConnector.this.newEndPoint((DatagramChannel)channel, selector, selectionKey);
endPoint.setIdleTimeout(getIdleTimeout());
return endPoint;
}
@ -323,7 +333,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
ServerQuicConnection connection = new ServerQuicConnection(QuicServerConnector.this, endpoint);
ServerQuicConnection connection = QuicServerConnector.this.newConnection(endpoint);
connection.addEventListener(container);
connection.setInputBufferSize(getInputBufferSize());
connection.setOutputBufferSize(getOutputBufferSize());

View File

@ -45,13 +45,18 @@ public class ServerQuicConnection extends QuicConnection
private final QuicServerConnector connector;
private final SessionTimeouts sessionTimeouts;
protected ServerQuicConnection(QuicServerConnector connector, EndPoint endPoint)
public ServerQuicConnection(QuicServerConnector connector, EndPoint endPoint)
{
super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint);
this.connector = connector;
this.sessionTimeouts = new SessionTimeouts(connector.getScheduler());
}
public QuicServerConnector getQuicServerConnector()
{
return connector;
}
@Override
public void onOpen()
{
@ -87,13 +92,18 @@ public class ServerQuicConnection extends QuicConnection
}
else
{
QuicSession session = new ServerQuicSession(getExecutor(), getScheduler(), bufferPool, quicheConnection, this, remoteAddress, connector);
ServerQuicSession session = newQuicSession(remoteAddress, quicheConnection);
// Send the response packet(s) that tryAccept() generated.
session.flush();
return session;
}
}
protected ServerQuicSession newQuicSession(SocketAddress remoteAddress, QuicheConnection quicheConnection)
{
return new ServerQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), quicheConnection, this, remoteAddress, getQuicServerConnector());
}
public void schedule(ServerQuicSession session)
{
sessionTimeouts.schedule(session);

View File

@ -46,7 +46,7 @@ public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Exp
private final Connector connector;
private long expireNanoTime = Long.MAX_VALUE;
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
public ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
{
super(executor, scheduler, bufferPool, quicheConnection, connection, remoteAddress);
this.connector = connector;

View File

@ -0,0 +1,103 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.is;
public class IdleTimeoutTest
{
private Server server;
private ServerConnector connector;
@BeforeEach
public void prepare()
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(server);
}
@Test
public void testIdleTimeoutWhenCongested() throws Exception
{
long idleTimeout = 1000;
HttpConnectionFactory h1 = new HttpConnectionFactory(new HttpConfiguration());
connector = new ServerConnector(server, 1, 1, h1)
{
@Override
protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
{
SocketChannelEndPoint endpoint = new SocketChannelEndPoint(channel, selectSet, key, getScheduler())
{
@Override
public boolean flush(ByteBuffer... buffers)
{
// Fake TCP congestion.
return false;
}
@Override
protected void onIncompleteFlush()
{
// Do nothing here to avoid spin loop,
// since the network is actually writable,
// as we are only faking TCP congestion.
}
};
endpoint.setIdleTimeout(getIdleTimeout());
return endpoint;
}
};
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
server.start();
try (SocketChannel client = SocketChannel.open())
{
client.connect(new InetSocketAddress("localhost", connector.getLocalPort()));
HttpTester.Request request = HttpTester.newRequest();
client.write(request.generate());
// The server never writes back anything, but should close the connection.
client.configureBlocking(false);
ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
await().atMost(Duration.ofSeconds(5)).until(() -> client.read(inputBuffer), is(-1));
await().atMost(5, TimeUnit.SECONDS).until(() -> connector.getConnectedEndPoints().size(), is(0));
}
}
}