Issue #6728 - QUIC and HTTP/3

- Made sure Connection.Listeners are properly notified.
- Fixed removal of QuicStreamEndPoints from QuicSession in case of successful request/response exchanges.
- Avoid spurious wakeups by shutting down input after receiving a frame with last=true.
- Updated HttpClient transport tests to work with UNIX_DOMAIN.
- Started updating HttpClient transport tests to work with HTTP/3.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-25 09:42:42 +02:00
parent ae15c5cc63
commit 3585b408b7
36 changed files with 393 additions and 260 deletions

View File

@ -565,6 +565,9 @@ public class HttpClient extends ContainerLifeCycle
Map<String, Object> context = new ConcurrentHashMap<>();
context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this);
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
Origin.Protocol protocol = destination.getOrigin().getProtocol();
List<String> protocols = protocol != null ? protocol.getProtocols() : List.of("http/1.1");
context.put(ClientConnector.APPLICATION_PROTOCOLS_CONTEXT_KEY, protocols);
Origin.Address address = destination.getConnectAddress();
resolver.resolve(address.getHost(), address.getPort(), new Promise<>()

View File

@ -54,6 +54,7 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
long streamId = streamEndPoint.getStreamId();
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(http3Session.getSessionClient(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
return new ClientHTTP3StreamConnection(streamEndPoint, http3Session, parser);
ClientHTTP3StreamConnection connection = new ClientHTTP3StreamConnection(streamEndPoint, http3Session, parser);
return customize(connection, context);
}
}

View File

@ -43,9 +43,9 @@ public class ClientHTTP3Session extends ClientProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ClientHTTP3Session.class);
private final HTTP3SessionClient session;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionClient session;
private final ControlFlusher controlFlusher;
private final MessageFlusher messageFlusher;
@ -60,7 +60,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
LOG.debug("initializing HTTP/3 streams");
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
addBean(encoder);
@ -68,7 +68,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxResponseHeadersSize());
addBean(decoder);
@ -76,7 +76,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
addBean(controlFlusher);
if (LOG.isDebugEnabled())
@ -97,7 +97,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
}
@Override
protected void initialize()
protected void onStart()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
@ -109,21 +109,28 @@ public class ClientHTTP3Session extends ClientProtocolSession
controlFlusher.iterate();
}
@Override
protected void onStop()
{
// Nothing to do, not even calling super,
// as onStart() does not call super either.
}
private void fail(Throwable failure)
{
// TODO: must close the connection.
}
private QuicStreamEndPoint configureInstructionEndPoint(long streamId)
private QuicStreamEndPoint openInstructionEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::opened);
}
private QuicStreamEndPoint configureControlEndPoint(long streamId)
private QuicStreamEndPoint openControlEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::opened);
}
@Override
@ -138,7 +145,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::openUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
@ -175,12 +182,11 @@ public class ClientHTTP3Session extends ClientProtocolSession
session.onClose(error, reason);
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
private void openUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, session);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
endPoint.opened();
}
void writeControlFrame(Frame frame, Callback callback)
@ -191,7 +197,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
void writeMessageFrame(long streamId, Frame frame, Callback callback)
{
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::openProtocolEndPoint);
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
}

View File

@ -262,7 +262,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (LOG.isDebugEnabled())
LOG.debug("new request stream #{} with {} on {}", streamId, frame, this);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::openProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
promise.whenComplete((s, x) ->
@ -283,15 +283,15 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
{
if (listener == null)
endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
stream.updateClose(frame.isLast(), true);
promise.succeeded(stream);
}
else
{
removeStream(stream);
removeStream(stream, x);
promise.failed(x);
}
});
stream.updateClose(frame.isLast(), true);
return promise;
}
@ -350,7 +350,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
return streams.get(streamId);
}
public void removeStream(HTTP3Stream stream)
public void removeStream(HTTP3Stream stream, Throwable failure)
{
boolean removed = streams.remove(stream.getId()) != null;
if (removed)
@ -358,6 +358,10 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (LOG.isDebugEnabled())
LOG.debug("destroyed {}", stream);
// Do not call HTTP3Stream.reset() or QuicStreamEndPoint.close(...),
// as we do not want to send a RESET_STREAM frame to the other peer.
getProtocolSession().getQuicSession().remove(stream.getEndPoint(), failure);
if (streamCount.decrementAndGet() == 0)
tryRunZeroStreamsAction();
}
@ -813,7 +817,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (stream != null)
{
stream.onFailure(failure);
removeStream(stream);
removeStream(stream, failure);
}
}
@ -875,8 +879,9 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
@Override
protected boolean onExpired(HTTP3Stream stream)
{
if (stream.onIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed")))
removeStream(stream);
TimeoutException timeout = new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed");
if (stream.onIdleTimeout(timeout))
removeStream(stream, timeout);
// The iterator returned from the method above does not support removal.
return false;
}

View File

@ -137,17 +137,25 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
@Override
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
Promise.Completable<Stream> completable = writeFrame(frame);
updateClose(frame.isLast(), true);
return completable;
return write(frame);
}
@Override
public CompletableFuture<Stream> data(DataFrame frame)
{
Promise.Completable<Stream> completable = writeFrame(frame);
updateClose(frame.isLast(), true);
return completable;
return write(frame);
}
private CompletableFuture<Stream> write(Frame frame)
{
return writeFrame(frame)
.whenComplete((s, x) ->
{
if (x == null)
updateClose(Frame.isLast(frame), true);
else
session.removeStream(this, x);
});
}
@Override
@ -181,7 +189,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
{
if (!frame.isLast())
throw new IllegalArgumentException("invalid trailer frame: property 'last' must be true");
return writeFrame(frame);
return write(frame);
}
public boolean hasDemand()
@ -312,7 +320,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
public void onFailure(Throwable failure)
{
notifyFailure(failure);
session.removeStream(this);
session.removeStream(this, failure);
}
private void notifyFailure(Throwable failure)
@ -377,7 +385,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
if (!local)
{
closeState = CloseState.CLOSED;
session.removeStream(this);
session.removeStream(this, null);
}
break;
}
@ -386,7 +394,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
if (local)
{
closeState = CloseState.CLOSED;
session.removeStream(this);
session.removeStream(this, null);
}
break;
}
@ -408,7 +416,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
if (LOG.isDebugEnabled())
LOG.debug("resetting {} with error 0x{} {}", this, Long.toHexString(error), failure.toString());
closeState = CloseState.CLOSED;
session.removeStream(this);
session.removeStream(this, failure);
endPoint.close(error, failure);
}

View File

@ -389,7 +389,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
remotelyClosed = frame.isLast();
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
{
@ -400,10 +399,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
else
{
// Trailer.
remotelyClosed = true;
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
if (frame.isLast())
shutdownInput();
super.onHeaders(streamId, frame);
}
@ -413,9 +413,21 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (dataFrame != null)
throw new IllegalStateException();
dataFrame = frame;
dataLast = frame.isLast();
remotelyClosed = frame.isLast();
if (frame.isLast())
{
dataLast = true;
shutdownInput();
}
super.onData(streamId, frame);
}
private void shutdownInput()
{
remotelyClosed = true;
// We want to shutdown the input to avoid "spurious" wakeups where
// zero bytes could be spuriously read from the EndPoint after the
// stream is remotely closed by receiving a frame with last=true.
getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
}
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http3.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
@ -30,13 +29,17 @@ import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.HTTP3ClientConnectionFactory;
import org.eclipse.jetty.http3.client.http.internal.SessionClientListener;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicSession;
public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport
public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport implements ProtocolSession.Factory
{
private final HTTP3ClientConnectionFactory factory = new HTTP3ClientConnectionFactory();
private final HTTP3Client client;
public HttpClientTransportOverHTTP3(HTTP3Client client)
@ -112,8 +115,14 @@ public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{
return null;
return factory.newProtocolSession(quicSession, context);
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
return factory.newConnection(endPoint, context);
}
}

View File

@ -294,6 +294,21 @@ public class HttpChannelOverHTTP3 extends HttpChannel
@Override
protected boolean eof()
{
HttpInput.Content content = this.content;
if (content == null)
{
this.content = new HttpInput.EofContent();
}
else
{
if (!content.isEof())
{
if (content.remaining() == 0)
this.content = new HttpInput.EofContent();
else
throw new IllegalStateException();
}
}
return false;
}
}

View File

@ -42,9 +42,9 @@ public class ServerHTTP3Session extends ServerProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ServerHTTP3Session.class);
private final HTTP3SessionServer session;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionServer session;
private final ControlFlusher controlFlusher;
private final MessageFlusher messageFlusher;
@ -59,7 +59,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
LOG.debug("initializing HTTP/3 streams");
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
addBean(encoder);
@ -67,7 +67,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxRequestHeadersSize());
addBean(decoder);
@ -75,7 +75,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers());
addBean(controlFlusher);
if (LOG.isDebugEnabled())
@ -96,9 +96,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
}
@Override
protected void doStart() throws Exception
protected void onStart()
{
super.doStart();
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
if (settings == null)
@ -114,16 +113,16 @@ public class ServerHTTP3Session extends ServerProtocolSession
// TODO: must close the connection.
}
private QuicStreamEndPoint configureInstructionEndPoint(long streamId)
private QuicStreamEndPoint openInstructionEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::opened);
}
private QuicStreamEndPoint configureControlEndPoint(long streamId)
private QuicStreamEndPoint openControlEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::opened);
}
@Override
@ -138,7 +137,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::openUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
@ -175,12 +174,11 @@ public class ServerHTTP3Session extends ServerProtocolSession
session.onClose(error, reason);
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
private void openUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, session);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
endPoint.opened();
}
void writeControlFrame(Frame frame, Callback callback)
@ -191,7 +189,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
void writeMessageFrame(long streamId, Frame frame, Callback callback)
{
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::openProtocolEndPoint);
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
}

View File

@ -89,6 +89,7 @@ public class AbstractClientServerTest
protected void startClient() throws Exception
{
http3Client = new HTTP3Client();
http3Client.getQuicConfiguration().setVerifyPeerCertificates(false);
httpClient = new HttpClient(new HttpClientTransportDynamic(new ClientConnectionFactoryOverHTTP3.HTTP3(http3Client)));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");

View File

@ -27,11 +27,15 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.QuicSession;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -89,12 +93,14 @@ public class ClientServerTest extends AbstractClientServerTest
@Test
public void testGETThenResponseWithoutContent() throws Exception
{
AtomicReference<HTTP3Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverRequestLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverSessionRef.set((HTTP3Session)stream.getSession());
serverRequestLatch.countDown();
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
@ -103,11 +109,11 @@ public class ClientServerTest extends AbstractClientServerTest
}
});
Session.Client session = newSession(new Session.Client.Listener() {});
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {});
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HeadersFrame frame = new HeadersFrame(newRequest("/"), true);
Stream stream = session.newRequest(frame, new Stream.Listener()
Stream stream = clientSession.newRequest(frame, new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
@ -120,6 +126,18 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
HTTP3Session serverSession = serverSessionRef.get();
assertTrue(serverSession.getStreams().isEmpty());
assertTrue(clientSession.getStreams().isEmpty());
QuicSession serverQuicSession = serverSession.getProtocolSession().getQuicSession();
assertTrue(serverQuicSession.getQuicStreamEndPoints().stream()
.noneMatch(endPoint -> endPoint.getStreamId() == stream.getId()));
ClientQuicSession clientQuicSession = clientSession.getProtocolSession().getQuicSession();
assertTrue(clientQuicSession.getQuicStreamEndPoints().stream()
.noneMatch(endPoint -> endPoint.getStreamId() == stream.getId()));
}
@Test
@ -194,11 +212,13 @@ public class ClientServerTest extends AbstractClientServerTest
@ValueSource(ints = {1024, 10 * 1024, 100 * 1024, 1000 * 1024})
public void testEchoRequestContentAsResponseContent(int length) throws Exception
{
AtomicReference<HTTP3Session> serverSessionRef = new AtomicReference<>();
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverSessionRef.set((HTTP3Session)stream.getSession());
// Send the response headers.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
stream.demand();
@ -225,7 +245,7 @@ public class ClientServerTest extends AbstractClientServerTest
}
});
Session.Client session = newSession(new Session.Client.Listener() {});
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {});
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HeadersFrame frame = new HeadersFrame(newRequest("/"), false);
@ -234,7 +254,7 @@ public class ClientServerTest extends AbstractClientServerTest
byte[] bytesReceived = new byte[bytesSent.length];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived);
CountDownLatch clientDataLatch = new CountDownLatch(1);
Stream stream = session.newRequest(frame, new Stream.Listener()
Stream stream = clientSession.newRequest(frame, new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
@ -266,6 +286,18 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDataLatch.await(15, TimeUnit.SECONDS));
assertArrayEquals(bytesSent, bytesReceived);
HTTP3Session serverSession = serverSessionRef.get();
assertTrue(serverSession.getStreams().isEmpty());
assertTrue(clientSession.getStreams().isEmpty());
QuicSession serverQuicSession = serverSession.getProtocolSession().getQuicSession();
assertTrue(serverQuicSession.getQuicStreamEndPoints().stream()
.noneMatch(endPoint -> endPoint.getStreamId() == stream.getId()));
ClientQuicSession clientQuicSession = clientSession.getProtocolSession().getQuicSession();
assertTrue(clientQuicSession.getQuicStreamEndPoints().stream()
.noneMatch(endPoint -> endPoint.getStreamId() == stream.getId()));
}
@Test

View File

@ -24,11 +24,6 @@
<artifactId>quic-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.quic</groupId>

View File

@ -13,7 +13,6 @@
module org.eclipse.jetty.quic.client
{
requires org.eclipse.jetty.client;
requires org.eclipse.jetty.quic.quiche;
requires org.slf4j;

View File

@ -25,6 +25,7 @@ public class ClientProtocolSession extends ProtocolSession
private static final Logger LOG = LoggerFactory.getLogger(ClientProtocolSession.class);
private final Runnable producer = Invocable.from(Invocable.InvocationType.NON_BLOCKING, this::produce);
private QuicStreamEndPoint endPoint;
public ClientProtocolSession(ClientQuicSession session)
{
@ -41,15 +42,30 @@ public class ClientProtocolSession extends ProtocolSession
protected void doStart() throws Exception
{
super.doStart();
initialize();
onStart();
}
protected void initialize()
protected void onStart()
{
// Create a single bidirectional, client-initiated,
// QUIC stream that plays the role of the TCP stream.
long streamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
endPoint = getOrCreateStreamEndPoint(streamId, this::openProtocolEndPoint);
}
@Override
protected void doStop() throws Exception
{
onStop();
super.doStop();
}
protected void onStop()
{
QuicStreamEndPoint endPoint = this.endPoint;
if (endPoint != null)
endPoint.closed(null);
this.endPoint = null;
}
@Override

View File

@ -23,8 +23,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
@ -71,17 +69,15 @@ public class ClientQuicConnection extends QuicConnection
List<String> protocols = quicConfiguration.getProtocols();
if (protocols == null || protocols.isEmpty())
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
if (destination != null)
protocols = destination.getOrigin().getProtocol().getProtocols();
if (protocols == null)
protocols = (List<String>)context.get(ClientConnector.APPLICATION_PROTOCOLS_CONTEXT_KEY);
if (protocols == null || protocols.isEmpty())
throw new IllegalStateException("Missing ALPN protocols");
}
QuicheConfig quicheConfig = new QuicheConfig();
quicheConfig.setApplicationProtos(protocols.toArray(String[]::new));
quicheConfig.setDisableActiveMigration(true);
quicheConfig.setVerifyPeer(false);
quicheConfig.setDisableActiveMigration(quicConfiguration.isDisableActiveMigration());
quicheConfig.setVerifyPeer(quicConfiguration.isVerifyPeerCertificates());
// Idle timeouts must not be managed by Quiche.
quicheConfig.setMaxIdleTimeout(0L);
quicheConfig.setInitialMaxData((long)quicConfiguration.getSessionRecvWindow());

View File

@ -45,6 +45,8 @@ public class QuicClientConnectorConfigurator extends ClientConnector.Configurato
// Initialize to sane defaults for a client.
configuration.setSessionRecvWindow(16 * 1024 * 1024);
configuration.setBidirectionalStreamRecvWindow(8 * 1024 * 1024);
configuration.setDisableActiveMigration(true);
configuration.setVerifyPeerCertificates(true);
}
public QuicConfiguration getQuicConfiguration()

View File

@ -87,7 +87,9 @@ public class End2EndClientTest
ClientConnectionFactory.Info http1Info = HttpClientConnectionFactory.HTTP11;
ClientConnectionFactoryOverHTTP2.HTTP2 http2Info = new ClientConnectionFactoryOverHTTP2.HTTP2(new HTTP2Client());
HttpClientTransportDynamic transport = new HttpClientTransportDynamic(new ClientConnector(new QuicClientConnectorConfigurator()), http1Info, http2Info);
QuicClientConnectorConfigurator configurator = new QuicClientConnectorConfigurator();
configurator.getQuicConfiguration().setVerifyPeerCertificates(false);
HttpClientTransportDynamic transport = new HttpClientTransportDynamic(new ClientConnector(configurator), http1Info, http2Info);
client = new HttpClient(transport);
client.start();
}

View File

@ -110,12 +110,11 @@ public abstract class ProtocolSession extends ContainerLifeCycle
protected abstract boolean onReadable(long readableStreamId);
public void configureProtocolEndPoint(QuicStreamEndPoint endPoint)
public void openProtocolEndPoint(QuicStreamEndPoint endPoint)
{
Connection connection = getQuicSession().newConnection(endPoint);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
endPoint.opened();
}
protected boolean onIdleTimeout()

View File

@ -20,6 +20,8 @@ public class QuicConfiguration
public static final String CONTEXT_KEY = QuicConfiguration.class.getName();
private List<String> protocols = List.of();
private boolean disableActiveMigration;
private boolean verifyPeerCertificates;
private int maxBidirectionalRemoteStreams;
private int maxUnidirectionalRemoteStreams;
private int sessionRecvWindow;
@ -36,6 +38,26 @@ public class QuicConfiguration
this.protocols = protocols;
}
public boolean isDisableActiveMigration()
{
return disableActiveMigration;
}
public void setDisableActiveMigration(boolean disableActiveMigration)
{
this.disableActiveMigration = disableActiveMigration;
}
public boolean isVerifyPeerCertificates()
{
return verifyPeerCertificates;
}
public void setVerifyPeerCertificates(boolean verifyPeerCertificates)
{
this.verifyPeerCertificates = verifyPeerCertificates;
}
public int getMaxBidirectionalRemoteStreams()
{
return maxBidirectionalRemoteStreams;

View File

@ -246,9 +246,15 @@ public abstract class QuicSession extends ContainerLifeCycle
flush();
}
public void onClose(long streamId)
public void remove(QuicStreamEndPoint endPoint, Throwable failure)
{
endPoints.remove(streamId);
boolean removed = endPoints.remove(endPoint.getStreamId()) != null;
if (removed)
{
if (LOG.isDebugEnabled())
LOG.debug("removed {} from {}", endPoint, this);
endPoint.closed(failure);
}
}
public SocketAddress getLocalAddress()

View File

@ -50,6 +50,24 @@ public class QuicStreamEndPoint extends AbstractEndPoint
this.streamId = streamId;
}
public void opened()
{
if (LOG.isDebugEnabled())
LOG.debug("opened {}", this);
Connection connection = getConnection();
if (connection != null)
connection.onOpen();
}
public void closed(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("closed {}", this);
Connection connection = getConnection();
if (connection != null)
connection.onClose(failure);
}
public QuicSession getQuicSession()
{
return session;
@ -83,13 +101,13 @@ public class QuicStreamEndPoint extends AbstractEndPoint
{
shutdownInput();
if (LOG.isDebugEnabled())
LOG.debug("shutting down input of stream #{} with error 0x{}", streamId, Long.toHexString(error));
LOG.debug("shutting down input with error 0x{} on {}", Long.toHexString(error), this);
session.shutdownInput(streamId, error);
}
catch (IOException x)
{
if (LOG.isDebugEnabled())
LOG.debug("error shutting down input of stream #{} with error 0x{}", streamId, Long.toHexString(error), x);
LOG.debug("error shutting down input with error 0x{} on {}", Long.toHexString(error), this, x);
}
}
@ -99,13 +117,13 @@ public class QuicStreamEndPoint extends AbstractEndPoint
{
shutdownOutput();
if (LOG.isDebugEnabled())
LOG.debug("shutting down output of stream #{} with error 0x{}", streamId, Long.toHexString(error));
LOG.debug("shutting down output with error 0x{} on {}", Long.toHexString(error), this);
session.shutdownOutput(streamId, error);
}
catch (IOException x)
{
if (LOG.isDebugEnabled())
LOG.debug("error shutting down output of stream #{} with error 0x{}", streamId, Long.toHexString(error), x);
LOG.debug("error shutting down output with error 0x{} on {}", Long.toHexString(error), this, x);
}
}
@ -125,10 +143,10 @@ public class QuicStreamEndPoint extends AbstractEndPoint
else
writeFlusher.onFail(failure);
session.onClose(streamId);
session.remove(this, failure);
if (LOG.isDebugEnabled())
LOG.debug("closed stream #{} with error 0x{}", streamId, Long.toHexString(error), failure);
LOG.debug("closed with error 0x{} {}", Long.toHexString(error), this, failure);
}
@Override
@ -142,7 +160,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint
public int fill(ByteBuffer buffer) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("filling buffer from stream {} finished={}", streamId, isStreamFinished());
LOG.debug("filling buffer finished={} from {}", isStreamFinished(), this);
int pos = BufferUtil.flipToFill(buffer);
int drained = session.fill(streamId, buffer);
BufferUtil.flipToFlush(buffer, pos);
@ -160,22 +178,22 @@ public class QuicStreamEndPoint extends AbstractEndPoint
if (last)
--length;
if (LOG.isDebugEnabled())
LOG.debug("flushing {} buffer(s) to stream {}", length, streamId);
LOG.debug("flushing {} buffer(s) to {}", length, this);
for (int i = 0; i < length; ++i)
{
ByteBuffer buffer = buffers[i];
int flushed = session.flush(streamId, buffer, (i == length - 1) && last);
if (LOG.isDebugEnabled())
LOG.debug("flushed {} bytes to stream {} window={}/{}", flushed, streamId, session.getWindowCapacity(streamId), session.getWindowCapacity());
LOG.debug("flushed {} bytes window={}/{} to {}", flushed, session.getWindowCapacity(streamId), session.getWindowCapacity(), this);
if (buffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("incomplete flushing of stream {}", streamId);
LOG.debug("incomplete flushing of {}", this);
return false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("flushed stream {}", streamId);
LOG.debug("flushed {}", this);
return true;
}

View File

@ -75,6 +75,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
// One bidirectional stream to simulate the TCP stream, and no unidirectional streams.
quicConfiguration.setMaxBidirectionalRemoteStreams(1);
quicConfiguration.setMaxUnidirectionalRemoteStreams(0);
quicConfiguration.setVerifyPeerCertificates(true);
}
public QuicConfiguration getQuicConfiguration()
@ -157,7 +158,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
quicheConfig.setPrivKeyPemPath(pemFiles[0].getPath());
quicheConfig.setCertChainPemPath(pemFiles[1].getPath());
quicheConfig.setVerifyPeer(false);
quicheConfig.setVerifyPeer(quicConfiguration.isVerifyPeerCertificates());
// Idle timeouts must not be managed by Quiche.
quicheConfig.setMaxIdleTimeout(0L);
quicheConfig.setInitialMaxData((long)quicConfiguration.getSessionRecvWindow());

View File

@ -26,7 +26,7 @@ public class ServerProtocolSession extends ProtocolSession
private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class);
private final Runnable producer = Invocable.from(Invocable.InvocationType.BLOCKING, this::produce);
private final Consumer<QuicStreamEndPoint> configureProtocolEndPoint = this::configureProtocolEndPoint;
private final Consumer<QuicStreamEndPoint> configureProtocolEndPoint = this::openProtocolEndPoint;
public ServerProtocolSession(ServerQuicSession session)
{
@ -39,6 +39,28 @@ public class ServerProtocolSession extends ProtocolSession
return (ServerQuicSession)super.getQuicSession();
}
@Override
protected void doStart() throws Exception
{
super.doStart();
onStart();
}
protected void onStart()
{
}
@Override
protected void doStop() throws Exception
{
onStop();
super.doStop();
}
protected void onStop()
{
}
@Override
public Runnable getProducer()
{

View File

@ -58,6 +58,8 @@ import org.eclipse.jetty.util.thread.Scheduler;
@ManagedObject
public class UnixDomainServerConnector extends AbstractConnector
{
public static final int MAX_UNIX_DOMAIN_PATH_LENGTH = 107;
private final AtomicReference<Closeable> acceptor = new AtomicReference<>();
private final SelectorManager selectorManager;
private ServerSocketChannel serverChannel;

View File

@ -87,13 +87,18 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-unixsocket-client</artifactId>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-http-client-transport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-unixsocket-server</artifactId>
<artifactId>jetty-unixdomain-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -38,7 +38,6 @@ import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.params.ParameterizedTest;
@ -116,6 +115,7 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
{
case HTTP:
case HTTPS:
case UNIX_DOMAIN:
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
@ -123,7 +123,7 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
return new HttpClientTransportOverHTTP(clientConnector)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
return new HttpConnectionOverHTTP(endPoint, context)
{
@ -173,6 +173,11 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}
};
}
case H3:
{
// TODO:
throw new UnsupportedOperationException();
}
case FCGI:
{
ClientConnector clientConnector = new ClientConnector();
@ -201,31 +206,6 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}
};
}
case UNIX_SOCKET:
{
return new HttpClientTransportOverUnixSockets(scenario.sockFile.toString())
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
return new HttpConnectionOverHTTP(endPoint, context)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this)
{
@Override
public boolean associate(HttpExchange exchange)
{
return code.test(exchange) && super.associate(exchange);
}
};
}
};
}
};
}
default:
{
throw new IllegalArgumentException();

View File

@ -49,8 +49,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets;
import org.eclipse.jetty.unixsocket.server.UnixSocketConnector;
import org.eclipse.jetty.unixdomain.server.UnixDomainServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.ProcessorUtils;
@ -156,14 +155,14 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
int factor = (logger.isDebugEnabled() ? 25 : 1) * 100;
// Dumps the state of the client if the test takes too long
final Thread testThread = Thread.currentThread();
Thread testThread = Thread.currentThread();
Scheduler.Task task = scenario.client.getScheduler().schedule(() ->
{
logger.warn("Interrupting test, it is taking too long{}{}{}{}",
System.lineSeparator(), scenario.server.dump(),
System.lineSeparator(), scenario.client.dump());
testThread.interrupt();
}, iterations * factor, TimeUnit.MILLISECONDS);
}, (long)iterations * factor, TimeUnit.MILLISECONDS);
long begin = System.nanoTime();
for (int i = 0; i < iterations; ++i)
@ -175,7 +174,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
long end = System.nanoTime();
task.cancel();
long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin);
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1);
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000L / elapsed : -1);
for (String failure : failures)
{
@ -185,7 +184,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
assertTrue(failures.isEmpty(), failures.toString());
}
private void test(final CountDownLatch latch, final List<String> failures)
private void test(CountDownLatch latch, List<String> failures)
{
ThreadLocalRandom random = ThreadLocalRandom.current();
// Choose a random destination
@ -196,12 +195,8 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
boolean ssl = scenario.transport.isTlsBased();
// Choose randomly whether to close the connection on the client or on the server
boolean clientClose = false;
if (!ssl && random.nextInt(100) < 5)
clientClose = true;
boolean serverClose = false;
if (!ssl && random.nextInt(100) < 5)
serverClose = true;
boolean clientClose = !ssl && random.nextInt(100) < 5;
boolean serverClose = !ssl && random.nextInt(100) < 5;
long clientTimeout = 0;
// if (!ssl && random.nextInt(100) < 5)
@ -213,10 +208,10 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, clientTimeout, contentLength, true, latch, failures);
}
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, boolean checkContentLength, CountDownLatch latch, List<String> failures)
{
long requestId = requestCount.incrementAndGet();
Request request = scenario.client.newRequest(host, scenario.getNetworkConnectorLocalPortInt().orElse(0))
Request request = scenario.client.newRequest(host, scenario.getServerPort().orElse(0))
.scheme(scheme)
.path("/" + requestId)
.method(method);
@ -243,7 +238,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
break;
}
final CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch requestLatch = new CountDownLatch(1);
request.send(new Response.Listener.Adapter()
{
private final AtomicInteger contentLength = new AtomicInteger();
@ -317,7 +312,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
String timeout = request.getHeader("X-Timeout");
if (timeout != null)
sleep(2 * Integer.parseInt(timeout));
sleep(2L * Integer.parseInt(timeout));
String method = request.getMethod().toUpperCase(Locale.ENGLISH);
switch (method)
@ -373,10 +368,10 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
int selectors = Math.min(1, ProcessorUtils.availableProcessors() / 2);
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
if (transport == Transport.UNIX_SOCKET)
if (transport == Transport.UNIX_DOMAIN)
{
UnixSocketConnector unixSocketConnector = new UnixSocketConnector(server, null, null, byteBufferPool, selectors, provideServerConnectionFactory(transport));
unixSocketConnector.setUnixSocket(sockFile.toString());
UnixDomainServerConnector unixSocketConnector = new UnixDomainServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
unixSocketConnector.setUnixDomainPath(unixDomainPath);
return unixSocketConnector;
}
return new ServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
@ -389,6 +384,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
{
case HTTP:
case HTTPS:
case UNIX_DOMAIN:
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
@ -419,20 +415,6 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
});
return clientTransport;
}
case UNIX_SOCKET:
{
HttpClientTransportOverUnixSockets clientTransport = new HttpClientTransportOverUnixSockets(sockFile.toString());
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
});
return clientTransport;
}
default:
{
return super.provideClientTransport(transport, sslContextFactory);

View File

@ -899,8 +899,6 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
CountDownLatch latch = new CountDownLatch(1);
OutputStreamRequestContent content = new OutputStreamRequestContent();
String uri = "http://0.0.0.1";
if (scenario.getNetworkConnectorLocalPort().isPresent())
uri += ":" + scenario.getNetworkConnectorLocalPort().get();
scenario.client.newRequest(uri)
.scheme(scenario.getScheme())
.body(content)
@ -993,8 +991,6 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
CountDownLatch completeLatch = new CountDownLatch(1);
String uri = "http://0.0.0.1";
if (scenario.getNetworkConnectorLocalPort().isPresent())
uri += ":" + scenario.getNetworkConnectorLocalPort().get();
scenario.client.newRequest(uri)
.scheme(scenario.getScheme())
.body(content)

View File

@ -47,9 +47,9 @@ import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http3.client.http.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.Net;
import org.eclipse.jetty.util.Callback;
@ -364,11 +364,18 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
clientThreads.setName("client");
scenario.client.setExecutor(clientThreads);
scenario.client.start();
if (transport == Transport.H3)
{
Assumptions.assumeTrue(false, "certificate verification not yet supported in quic");
// TODO: the lines below should be enough, but they don't work. To be investigated.
HttpClientTransportOverHTTP3 http3Transport = (HttpClientTransportOverHTTP3)scenario.client.getTransport();
http3Transport.getHTTP3Client().getQuicConfiguration().setVerifyPeerCertificates(true);
}
assertThrows(ExecutionException.class, () ->
{
// Use an IP address not present in the certificate.
int serverPort = ((ServerConnector)scenario.connector).getLocalPort();
int serverPort = scenario.getServerPort().orElse(0);
scenario.client.newRequest("https://127.0.0.2:" + serverPort)
.timeout(5, TimeUnit.SECONDS)
.send();
@ -708,8 +715,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
// Test with a full URI.
String hostAddress = "::1";
String host = "[" + hostAddress + "]";
int port = Integer.parseInt(scenario.getNetworkConnectorLocalPort().get());
String uri = scenario.getScheme() + "://" + host + ":" + port + "/path";
String uri = scenario.newURI().replace("localhost", host) + "/path";
ContentResponse response = scenario.client.newRequest(uri)
.method(HttpMethod.PUT)
.timeout(5, TimeUnit.SECONDS)
@ -719,6 +725,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
assertThat(new String(response.getContent(), StandardCharsets.ISO_8859_1), Matchers.startsWith("[::1]:"));
// Test with host address.
int port = scenario.getServerPort().orElse(0);
response = scenario.client.newRequest(hostAddress, port)
.scheme(scenario.getScheme())
.method(HttpMethod.PUT)
@ -765,7 +772,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
if (transport.isTlsBased())
scenario.httpConfig.getCustomizer(SecureRequestCustomizer.class).setSniHostCheck(false);
Origin origin = new Origin(scenario.getScheme(), "localhost", scenario.getNetworkConnectorLocalPortInt().get());
Origin origin = new Origin(scenario.getScheme(), "localhost", scenario.getServerPort().orElse(0));
HttpDestination destination = scenario.client.resolveDestination(origin);
org.eclipse.jetty.client.api.Request request = scenario.client.newRequest(requestHost, requestPort)
@ -813,10 +820,8 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
});
String host = "localhost";
int port = scenario.getNetworkConnectorLocalPortInt().get();
assertThrows(TimeoutException.class, () ->
scenario.client.newRequest(host, port)
scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
.path("/1")
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
@ -825,7 +830,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
latch.countDown();
// Make another request without specifying the idle timeout, should not fail
ContentResponse response = scenario.client.newRequest(host, port)
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
.path("/2")
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)

View File

@ -50,12 +50,10 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opentest4j.TestAbortedException;
import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -181,7 +179,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testTimeoutOnListenerWithExplicitConnection(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
long timeout = 1000;
@ -208,7 +205,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testTimeoutIsCancelledOnSuccessWithExplicitConnection(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
long timeout = 1000;
@ -287,7 +283,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testBlockingConnectTimeoutFailsRequest(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
testConnectTimeoutFailsRequest(true);
}
@ -296,7 +291,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testNonBlockingConnectTimeoutFailsRequest(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
testConnectTimeoutFailsRequest(false);
}
@ -332,7 +326,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testConnectTimeoutIsCancelledByShorterRequestTimeout(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
String host = "10.255.255.1";
@ -366,7 +359,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void retryAfterConnectTimeout(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
final String host = "10.255.255.1";
@ -421,15 +413,14 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testTimeoutCancelledWhenSendingThrowsException(Transport transport) throws Exception
{
assumeRealNetwork(transport);
init(transport);
scenario.start(new EmptyServerHandler());
long timeout = 1000;
String uri = "badscheme://0.0.0.1";
if (scenario.getNetworkConnectorLocalPort().isPresent())
uri += ":" + scenario.getNetworkConnectorLocalPort().get();
if (scenario.getServerPort().isPresent())
uri += ":" + scenario.getServerPort().getAsInt();
Request request = scenario.client.newRequest(uri);
// TODO: assert a more specific Throwable
@ -447,11 +438,6 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
assertNull(request.getAbortCause());
}
private void assumeRealNetwork(Transport transport)
{
Assumptions.assumeTrue(transport != UNIX_SOCKET);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testFirstRequestTimeoutAfterSecondRequestCompletes(Transport transport) throws Exception

View File

@ -100,8 +100,8 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int base = i % maxConnections;
int expected = remotePorts.get(base);
int candidate = remotePorts.get(i);
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts.toString(), expected, Matchers.equalTo(candidate));
if (transport != Transport.UNIX_SOCKET && i > 0)
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts, expected, Matchers.equalTo(candidate));
if (transport != Transport.UNIX_DOMAIN && i > 0)
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}
@ -195,7 +195,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int expected = remotePorts.get(base);
int candidate = remotePorts.get(i);
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts.toString(), expected, Matchers.equalTo(candidate));
if (transport != Transport.UNIX_SOCKET && i > 0)
if (transport != Transport.UNIX_DOMAIN && i > 0)
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}

View File

@ -52,7 +52,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
@ -261,8 +260,6 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
public void testAsyncWriteIdleTimeoutFires(Transport transport) throws Exception
{
init(transport);
// TODO work out why this test fails for UNIX_SOCKET
Assumptions.assumeFalse(scenario.transport == UNIX_SOCKET);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.http.client;
public enum Transport
{
HTTP, HTTPS, H2C, H2, FCGI, UNIX_SOCKET;
HTTP, HTTPS, H2C, H2, H3, FCGI, UNIX_DOMAIN;
public boolean isHttp1Based()
{
@ -29,6 +29,6 @@ public enum Transport
public boolean isTlsBased()
{
return this == HTTPS || this == H2;
return this == HTTPS || this == H2 || this == H3;
}
}

View File

@ -14,11 +14,9 @@
package org.eclipse.jetty.http.client;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.stream.Stream;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
@ -32,10 +30,7 @@ public class TransportProvider implements ArgumentsProvider
if (!StringUtil.isBlank(transports))
return Arrays.stream(transports.split("\\s*,\\s*")).map(Transport::valueOf);
if (OS.LINUX.isCurrentOs())
return Arrays.stream(Transport.values());
return EnumSet.complementOf(EnumSet.of(Transport.UNIX_SOCKET)).stream();
return Arrays.stream(Transport.values());
}
@Override

View File

@ -19,7 +19,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import javax.servlet.http.HttpServlet;
@ -36,6 +36,10 @@ import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.http.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.AbstractConnector;
@ -45,15 +49,16 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HostHeaderCustomizer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets;
import org.eclipse.jetty.unixsocket.server.UnixSocketConnector;
import org.eclipse.jetty.unixdomain.server.UnixDomainServerConnector;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -61,7 +66,6 @@ import org.junit.jupiter.api.Assumptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -77,43 +81,22 @@ public class TransportScenario
protected ServletContextHandler context;
protected String servletPath = "/servlet";
protected HttpClient client;
protected Path sockFile;
protected Path unixDomainPath;
protected final BlockingQueue<String> requestLog = new BlockingArrayQueue<>();
public TransportScenario(final Transport transport) throws IOException
public TransportScenario(Transport transport) throws IOException
{
this.transport = transport;
String dir = System.getProperty("jetty.unixdomain.dir");
assertNotNull(dir);
sockFile = Files.createTempFile(Path.of(dir), "unix_", ".sock");
assertTrue(sockFile.toAbsolutePath().toString().length() < UnixSocketConnector.MAX_UNIX_SOCKET_PATH_LENGTH, "Unix-Domain path too long");
Files.delete(sockFile);
// Disable UNIX_SOCKET due to jnr/jnr-unixsocket#69.
Assumptions.assumeTrue(transport != UNIX_SOCKET);
}
public Optional<String> getNetworkConnectorLocalPort()
{
if (connector instanceof ServerConnector)
if (transport == Transport.UNIX_DOMAIN)
{
ServerConnector serverConnector = (ServerConnector)connector;
return Optional.of(Integer.toString(serverConnector.getLocalPort()));
Assumptions.assumeTrue(JavaVersion.VERSION.getPlatform() >= 16);
String dir = System.getProperty("jetty.unixdomain.dir");
assertNotNull(dir);
unixDomainPath = Files.createTempFile(Path.of(dir), "unix_", ".sock");
assertTrue(unixDomainPath.toAbsolutePath().toString().length() < UnixDomainServerConnector.MAX_UNIX_DOMAIN_PATH_LENGTH, "Unix-Domain path too long");
Files.delete(unixDomainPath);
}
return Optional.empty();
}
public Optional<Integer> getNetworkConnectorLocalPortInt()
{
if (connector instanceof ServerConnector)
{
ServerConnector serverConnector = (ServerConnector)connector;
return Optional.of(serverConnector.getLocalPort());
}
return Optional.empty();
}
public String getScheme()
@ -121,14 +104,6 @@ public class TransportScenario
return transport.isTlsBased() ? "https" : "http";
}
public HTTP2Client newHTTP2Client(SslContextFactory.Client sslContextFactory)
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(sslContextFactory);
return new HTTP2Client(clientConnector);
}
public HttpClient newHttpClient(HttpClientTransport transport)
{
return new HttpClient(transport);
@ -136,13 +111,30 @@ public class TransportScenario
public Connector newServerConnector(Server server)
{
if (transport == Transport.UNIX_SOCKET)
switch (transport)
{
UnixSocketConnector unixSocketConnector = new UnixSocketConnector(server, provideServerConnectionFactory(transport));
unixSocketConnector.setUnixSocket(sockFile.toString());
return unixSocketConnector;
case HTTP:
case HTTPS:
case H2C:
case H2:
case FCGI:
return new ServerConnector(server, provideServerConnectionFactory(transport));
case H3:
return new HTTP3ServerConnector(server, sslContextFactory, provideServerConnectionFactory(transport));
case UNIX_DOMAIN:
UnixDomainServerConnector connector = new UnixDomainServerConnector(server, provideServerConnectionFactory(transport));
connector.setUnixDomainPath(unixDomainPath);
return connector;
default:
throw new IllegalStateException();
}
return new ServerConnector(server, provideServerConnectionFactory(transport));
}
public OptionalInt getServerPort()
{
if (connector instanceof NetworkConnector)
return OptionalInt.of(((NetworkConnector)connector).getLocalPort());
return OptionalInt.empty();
}
public String newURI()
@ -150,8 +142,7 @@ public class TransportScenario
StringBuilder ret = new StringBuilder();
ret.append(getScheme());
ret.append("://localhost");
Optional<String> localPort = getNetworkConnectorLocalPort();
localPort.ifPresent(s -> ret.append(':').append(s));
getServerPort().ifPresent(s -> ret.append(':').append(s));
return ret.toString();
}
@ -170,16 +161,31 @@ public class TransportScenario
case H2C:
case H2:
{
HTTP2Client http2Client = newHTTP2Client(sslContextFactory);
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(sslContextFactory);
HTTP2Client http2Client = new HTTP2Client(clientConnector);
return new HttpClientTransportOverHTTP2(http2Client);
}
case H3:
{
HTTP3Client http3Client = new HTTP3Client();
ClientConnector clientConnector = http3Client.getClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(sslContextFactory);
http3Client.getQuicConfiguration().setVerifyPeerCertificates(false);
return new HttpClientTransportOverHTTP3(http3Client);
}
case FCGI:
{
return new HttpClientTransportOverFCGI(1, "");
}
case UNIX_SOCKET:
case UNIX_DOMAIN:
{
return new HttpClientTransportOverUnixSockets(sockFile.toString());
ClientConnector clientConnector = ClientConnector.forUnixDomain(unixDomainPath);
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(sslContextFactory);
return new HttpClientTransportOverHTTP(clientConnector);
}
default:
{
@ -193,8 +199,8 @@ public class TransportScenario
List<ConnectionFactory> result = new ArrayList<>();
switch (transport)
{
case UNIX_SOCKET:
case HTTP:
case UNIX_DOMAIN:
{
result.add(new HttpConnectionFactory(httpConfig));
break;
@ -226,6 +232,14 @@ public class TransportScenario
result.add(h2);
break;
}
case H3:
{
httpConfig.addCustomizer(new SecureRequestCustomizer());
httpConfig.addCustomizer(new HostHeaderCustomizer());
HTTP3ServerConnectionFactory h3 = new HTTP3ServerConnectionFactory(httpConfig);
result.add(h3);
break;
}
case FCGI:
{
result.add(new ServerFCGIConnectionFactory(httpConfig));
@ -236,7 +250,7 @@ public class TransportScenario
throw new IllegalArgumentException();
}
}
return result.toArray(new ConnectionFactory[0]);
return result.toArray(ConnectionFactory[]::new);
}
public void setConnectionIdleTimeout(long idleTimeout)
@ -396,15 +410,15 @@ public class TransportScenario
LOG.trace("IGNORED", x);
}
if (sockFile != null)
if (unixDomainPath != null)
{
try
{
Files.deleteIfExists(sockFile);
Files.deleteIfExists(unixDomainPath);
}
catch (IOException e)
{
LOG.warn("Unable to delete sockFile: {}", sockFile, e);
LOG.warn("Unable to delete sockFile: {}", unixDomainPath, e);
}
}
}

View File

@ -1,4 +1,3 @@
# Jetty Logging using jetty-slf4j-impl
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.fcgi.LEVEL=DEBUG
@ -6,4 +5,8 @@
#org.eclipse.jetty.http2.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.client.LEVEL=DEBUG
#org.eclipse.jetty.http3.LEVEL=DEBUG
org.eclipse.jetty.http3.qpack.LEVEL=INFO
#org.eclipse.jetty.quic.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=INFO
#org.eclipse.jetty.io.LEVEL=DEBUG