Issue #6728 - QUIC and HTTP/3
- More fixes and improvement to HTTP client transport tests. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
aac4232e20
commit
baab1a15b9
|
@ -24,7 +24,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
|
||||||
{
|
{
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(LeakTrackingConnectionPool.class);
|
private static final Logger LOG = LoggerFactory.getLogger(LeakTrackingConnectionPool.class);
|
||||||
|
|
||||||
private final LeakDetector<Connection> leakDetector = new LeakDetector<Connection>()
|
private final LeakDetector<Connection> leakDetector = new LeakDetector<>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void leaked(LeakInfo leakInfo)
|
protected void leaked(LeakInfo leakInfo)
|
||||||
|
@ -35,7 +35,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
|
||||||
|
|
||||||
public LeakTrackingConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
public LeakTrackingConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||||
{
|
{
|
||||||
super((HttpDestination)destination, maxConnections, requester);
|
super(destination, maxConnections, requester);
|
||||||
addBean(leakDetector);
|
addBean(leakDetector);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
|
||||||
LOG.info("Connection {}@{} released but not acquired", connection, leakDetector.id(connection));
|
LOG.info("Connection {}@{} released but not acquired", connection, leakDetector.id(connection));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
protected void leaked(LeakDetector<Connection>.LeakInfo leakInfo)
|
||||||
{
|
{
|
||||||
LOG.info("Connection {} leaked at:", leakInfo.getResourceDescription(), leakInfo.getStackFrames());
|
LOG.info("Connection {} leaked at:", leakInfo.getResourceDescription(), leakInfo.getStackFrames());
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.eclipse.jetty.client.HttpClient;
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
import org.eclipse.jetty.client.HttpClientTransport;
|
import org.eclipse.jetty.client.HttpClientTransport;
|
||||||
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
||||||
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
|
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
|
||||||
import org.eclipse.jetty.http.HttpScheme;
|
import org.eclipse.jetty.http.HttpScheme;
|
||||||
import org.eclipse.jetty.io.ByteBufferPool;
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
|
@ -71,7 +72,7 @@ public abstract class AbstractHttpClientServerTest
|
||||||
transport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
transport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
protected void leaked(LeakDetector<Connection>.LeakInfo leakInfo)
|
||||||
{
|
{
|
||||||
connectionLeaks.incrementAndGet();
|
connectionLeaks.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.eclipse.jetty.quic.common.QuicConnection;
|
||||||
import org.eclipse.jetty.quic.common.QuicSession;
|
import org.eclipse.jetty.quic.common.QuicSession;
|
||||||
import org.eclipse.jetty.quic.quiche.QuicheConfig;
|
import org.eclipse.jetty.quic.quiche.QuicheConfig;
|
||||||
import org.eclipse.jetty.quic.quiche.QuicheConnection;
|
import org.eclipse.jetty.quic.quiche.QuicheConnection;
|
||||||
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
|
|
||||||
import org.eclipse.jetty.quic.server.internal.SimpleTokenMinter;
|
import org.eclipse.jetty.quic.server.internal.SimpleTokenMinter;
|
||||||
import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
|
import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
|
||||||
import org.eclipse.jetty.server.Connector;
|
import org.eclipse.jetty.server.Connector;
|
||||||
|
@ -70,8 +69,7 @@ public class ServerQuicConnection extends QuicConnection
|
||||||
QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, new SimpleTokenValidator((InetSocketAddress)remoteAddress), cipherBuffer, remoteAddress);
|
QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, new SimpleTokenValidator((InetSocketAddress)remoteAddress), cipherBuffer, remoteAddress);
|
||||||
if (quicheConnection == null)
|
if (quicheConnection == null)
|
||||||
{
|
{
|
||||||
// TODO make the buffer size configurable
|
ByteBuffer negotiationBuffer = byteBufferPool.acquire(getOutputBufferSize(), true);
|
||||||
ByteBuffer negotiationBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
|
|
||||||
int pos = BufferUtil.flipToFill(negotiationBuffer);
|
int pos = BufferUtil.flipToFill(negotiationBuffer);
|
||||||
// TODO make the token minter configurable
|
// TODO make the token minter configurable
|
||||||
if (!QuicheConnection.negotiate(new SimpleTokenMinter((InetSocketAddress)remoteAddress), cipherBuffer, negotiationBuffer))
|
if (!QuicheConnection.negotiate(new SimpleTokenMinter((InetSocketAddress)remoteAddress), cipherBuffer, negotiationBuffer))
|
||||||
|
|
|
@ -126,7 +126,9 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
|
||||||
{
|
{
|
||||||
init(transport);
|
init(transport);
|
||||||
|
|
||||||
int bufferSize = 512;
|
// A small buffer size so the response content is
|
||||||
|
// read in multiple buffers, but big enough for HTTP/3.
|
||||||
|
int bufferSize = 1536;
|
||||||
byte[] content = new byte[10 * bufferSize];
|
byte[] content = new byte[10 * bufferSize];
|
||||||
new Random().nextBytes(content);
|
new Random().nextBytes(content);
|
||||||
scenario.startServer(new EmptyServerHandler()
|
scenario.startServer(new EmptyServerHandler()
|
||||||
|
@ -140,7 +142,6 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
|
||||||
});
|
});
|
||||||
scenario.startClient(client ->
|
scenario.startClient(client ->
|
||||||
{
|
{
|
||||||
// A small buffer size so the response content is read in multiple buffers.
|
|
||||||
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
|
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
|
||||||
client.setResponseBufferSize(bufferSize);
|
client.setResponseBufferSize(bufferSize);
|
||||||
});
|
});
|
||||||
|
@ -287,7 +288,7 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
|
||||||
{
|
{
|
||||||
init(transport);
|
init(transport);
|
||||||
|
|
||||||
int bufferSize = 512;
|
int bufferSize = 1536;
|
||||||
byte[] content = new byte[10 * bufferSize];
|
byte[] content = new byte[10 * bufferSize];
|
||||||
new Random().nextBytes(content);
|
new Random().nextBytes(content);
|
||||||
scenario.startServer(new EmptyServerHandler()
|
scenario.startServer(new EmptyServerHandler()
|
||||||
|
|
|
@ -31,18 +31,17 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.eclipse.jetty.client.HttpClientTransport;
|
import org.eclipse.jetty.client.HttpClientTransport;
|
||||||
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
||||||
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.api.Response;
|
import org.eclipse.jetty.client.api.Response;
|
||||||
import org.eclipse.jetty.client.api.Result;
|
import org.eclipse.jetty.client.api.Result;
|
||||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
|
||||||
import org.eclipse.jetty.client.util.BytesRequestContent;
|
import org.eclipse.jetty.client.util.BytesRequestContent;
|
||||||
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
|
|
||||||
import org.eclipse.jetty.http.HttpHeader;
|
import org.eclipse.jetty.http.HttpHeader;
|
||||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||||
import org.eclipse.jetty.http.HttpMethod;
|
import org.eclipse.jetty.http.HttpMethod;
|
||||||
|
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
|
||||||
import org.eclipse.jetty.io.ArrayByteBufferPool;
|
import org.eclipse.jetty.io.ArrayByteBufferPool;
|
||||||
import org.eclipse.jetty.io.ByteBufferPool;
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
import org.eclipse.jetty.io.ClientConnector;
|
|
||||||
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
|
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
|
||||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||||
import org.eclipse.jetty.server.Connector;
|
import org.eclipse.jetty.server.Connector;
|
||||||
|
@ -368,58 +367,54 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||||
int selectors = Math.min(1, ProcessorUtils.availableProcessors() / 2);
|
int selectors = Math.min(1, ProcessorUtils.availableProcessors() / 2);
|
||||||
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
|
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
|
||||||
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
|
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
|
||||||
if (transport == Transport.UNIX_DOMAIN)
|
switch (transport)
|
||||||
{
|
{
|
||||||
UnixDomainServerConnector unixSocketConnector = new UnixDomainServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
|
case HTTP:
|
||||||
unixSocketConnector.setUnixDomainPath(unixDomainPath);
|
case HTTPS:
|
||||||
return unixSocketConnector;
|
case H2C:
|
||||||
|
case H2:
|
||||||
|
case FCGI:
|
||||||
|
return new ServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
|
||||||
|
case H3:
|
||||||
|
return new HTTP3ServerConnector(server, null, null, byteBufferPool, sslContextFactory, provideServerConnectionFactory(transport));
|
||||||
|
case UNIX_DOMAIN:
|
||||||
|
UnixDomainServerConnector unixSocketConnector = new UnixDomainServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
|
||||||
|
unixSocketConnector.setUnixDomainPath(unixDomainPath);
|
||||||
|
return unixSocketConnector;
|
||||||
|
default:
|
||||||
|
throw new IllegalStateException();
|
||||||
}
|
}
|
||||||
return new ServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HttpClientTransport provideClientTransport(Transport transport, SslContextFactory.Client sslContextFactory)
|
public HttpClientTransport provideClientTransport(Transport transport, SslContextFactory.Client sslContextFactory)
|
||||||
{
|
{
|
||||||
|
HttpClientTransport clientTransport = super.provideClientTransport(transport, sslContextFactory);
|
||||||
switch (transport)
|
switch (transport)
|
||||||
{
|
{
|
||||||
case HTTP:
|
case HTTP:
|
||||||
case HTTPS:
|
case HTTPS:
|
||||||
|
case FCGI:
|
||||||
case UNIX_DOMAIN:
|
case UNIX_DOMAIN:
|
||||||
{
|
{
|
||||||
ClientConnector clientConnector = new ClientConnector();
|
// Track connection leaking only for non-multiplexed transports.
|
||||||
clientConnector.setSelectors(1);
|
|
||||||
clientConnector.setSslContextFactory(sslContextFactory);
|
|
||||||
HttpClientTransport clientTransport = new HttpClientTransportOverHTTP(clientConnector);
|
|
||||||
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
protected void leaked(LeakDetector<Connection>.LeakInfo leakInfo)
|
||||||
{
|
{
|
||||||
super.leaked(leakInfo);
|
super.leaked(leakInfo);
|
||||||
connectionLeaks.incrementAndGet();
|
connectionLeaks.incrementAndGet();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return clientTransport;
|
break;
|
||||||
}
|
|
||||||
case FCGI:
|
|
||||||
{
|
|
||||||
HttpClientTransport clientTransport = new HttpClientTransportOverFCGI(1, "");
|
|
||||||
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
|
||||||
{
|
|
||||||
super.leaked(leakInfo);
|
|
||||||
connectionLeaks.incrementAndGet();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return clientTransport;
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
return super.provideClientTransport(transport, sslContextFactory);
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return clientTransport;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.IO;
|
import org.eclipse.jetty.util.IO;
|
||||||
|
import org.junit.jupiter.api.Assumptions;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||||
|
|
||||||
|
@ -890,6 +891,9 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
|
||||||
@ArgumentsSource(TransportProvider.class)
|
@ArgumentsSource(TransportProvider.class)
|
||||||
public void testUploadWithOutputStreamFailureToConnect(Transport transport) throws Exception
|
public void testUploadWithOutputStreamFailureToConnect(Transport transport) throws Exception
|
||||||
{
|
{
|
||||||
|
// Failure to connect is based on InetSocket address failure, which Unix-Domain does not use.
|
||||||
|
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
|
||||||
|
|
||||||
init(transport);
|
init(transport);
|
||||||
|
|
||||||
long connectTimeout = 1000;
|
long connectTimeout = 1000;
|
||||||
|
@ -972,6 +976,9 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
|
||||||
@ArgumentsSource(TransportProvider.class)
|
@ArgumentsSource(TransportProvider.class)
|
||||||
public void testUploadWithConnectFailureClosesStream(Transport transport) throws Exception
|
public void testUploadWithConnectFailureClosesStream(Transport transport) throws Exception
|
||||||
{
|
{
|
||||||
|
// Failure to connect is based on InetSocket address failure, which Unix-Domain does not use.
|
||||||
|
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
|
||||||
|
|
||||||
init(transport);
|
init(transport);
|
||||||
|
|
||||||
long connectTimeout = 1000;
|
long connectTimeout = 1000;
|
||||||
|
|
Loading…
Reference in New Issue