From e6cd0bae6415818fe722265915446fea1e695e48 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Mon, 22 Mar 2021 10:05:37 +0100 Subject: [PATCH] start quic client plumbing Signed-off-by: Ludovic Orban --- jetty-http3/http3-client/pom.xml | 45 ++ .../http3/client/ClientDatagramEndPoint.java | 439 ++++++++++++++++++ .../client/HttpClientTransportOverQuic.java | 91 ++++ .../http3/client/QuicClientConnector.java | 366 +++++++++++++++ .../jetty/http3/client/QuicConnection.java | 237 ++++++++++ .../jetty/http3/client/QuicSession.java | 327 +++++++++++++ .../http3/client/QuicStreamEndPoint.java | 150 ++++++ .../jetty/http3/client/End2EndClientTest.java | 103 ++++ .../test/resources/jetty-logging.properties | 3 + .../src/test/resources/keystore.p12 | Bin 0 -> 2445 bytes jetty-http3/http3-common/pom.xml | 40 ++ .../src/main/java/module-info.java | 20 + jetty-http3/http3-server/pom.xml | 2 +- jetty-http3/pom.xml | 2 + 14 files changed, 1824 insertions(+), 1 deletion(-) create mode 100644 jetty-http3/http3-client/pom.xml create mode 100644 jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/ClientDatagramEndPoint.java create mode 100644 jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HttpClientTransportOverQuic.java create mode 100644 jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicClientConnector.java create mode 100644 jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicConnection.java create mode 100644 jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicSession.java create mode 100644 jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicStreamEndPoint.java create mode 100644 jetty-http3/http3-client/src/test/java/org/eclipse/jetty/http3/client/End2EndClientTest.java create mode 100644 jetty-http3/http3-client/src/test/resources/jetty-logging.properties create mode 100644 jetty-http3/http3-client/src/test/resources/keystore.p12 create mode 100644 jetty-http3/http3-common/pom.xml create mode 100644 jetty-http3/http3-common/src/main/java/module-info.java diff --git a/jetty-http3/http3-client/pom.xml b/jetty-http3/http3-client/pom.xml new file mode 100644 index 00000000000..9af9513dd35 --- /dev/null +++ b/jetty-http3/http3-client/pom.xml @@ -0,0 +1,45 @@ + + + + org.eclipse.jetty.http3 + http3-parent + 10.0.2-SNAPSHOT + + + 4.0.0 + http3-client + Jetty :: HTTP3 :: Client + + + ${project.groupId}.client + + + + + org.slf4j + slf4j-api + + + org.eclipse.jetty.http3 + http3-common + ${project.version} + + + org.eclipse.jetty + jetty-client + ${project.version} + + + + org.eclipse.jetty.http3 + http3-server + ${project.version} + test + + + org.eclipse.jetty + jetty-slf4j-impl + test + + + diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/ClientDatagramEndPoint.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/ClientDatagramEndPoint.java new file mode 100644 index 00000000000..fd2bfdf15de --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/ClientDatagramEndPoint.java @@ -0,0 +1,439 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Objects; + +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable +{ + private static final Logger LOG = LoggerFactory.getLogger(ClientDatagramEndPoint.class); + + public static InetAddressArgument INET_ADDRESS_ARGUMENT = new InetAddressArgument(); + + private final AutoLock _lock = new AutoLock(); + private final DatagramChannel _channel; + private final ManagedSelector _selector; + private SelectionKey _key; + private boolean _updatePending; + // The current value for interestOps. + private int _currentInterestOps = SelectionKey.OP_READ; // See DatagramReader.update() + // The desired value for interestOps. + private int _desiredInterestOps; + + private abstract class RunnableTask implements Runnable, Invocable + { + final String _operation; + + protected RunnableTask(String op) + { + _operation = op; + } + + @Override + public String toString() + { + return String.format("%s:%s:%s", this, _operation, getInvocationType()); + } + } + + private abstract class RunnableCloseable extends RunnableTask implements Closeable + { + protected RunnableCloseable(String op) + { + super(op); + } + + @Override + public void close() + { + try + { + this.close(); + } + catch (Throwable x) + { + LOG.warn("Unable to close {}", this, x); + } + } + } + + private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction; + + private final Runnable _runFillable = new RunnableCloseable("runFillable") + { + @Override + public Invocable.InvocationType getInvocationType() + { + return getFillInterest().getCallbackInvocationType(); + } + + @Override + public void run() + { + getFillInterest().fillable(); + } + }; + + private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite") + { + @Override + public Invocable.InvocationType getInvocationType() + { + return getWriteFlusher().getCallbackInvocationType(); + } + + @Override + public void run() + { + getWriteFlusher().completeWrite(); + } + + @Override + public String toString() + { + return String.format("%s:%s:%s->%s", this, _operation, getInvocationType(), getWriteFlusher()); + } + }; + + private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable") + { + @Override + public Invocable.InvocationType getInvocationType() + { + Invocable.InvocationType fillT = getFillInterest().getCallbackInvocationType(); + Invocable.InvocationType flushT = getWriteFlusher().getCallbackInvocationType(); + if (fillT == flushT) + return fillT; + + if (fillT == Invocable.InvocationType.EITHER && flushT == Invocable.InvocationType.NON_BLOCKING) + return Invocable.InvocationType.EITHER; + + if (fillT == Invocable.InvocationType.NON_BLOCKING && flushT == Invocable.InvocationType.EITHER) + return Invocable.InvocationType.EITHER; + + return Invocable.InvocationType.BLOCKING; + } + + @Override + public void run() + { + getWriteFlusher().completeWrite(); + getFillInterest().fillable(); + } + }; + + public ClientDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + { + super(scheduler); + _channel = channel; + _selector = selector; + _key = key; + } + + @Override + public InetSocketAddress getLocalAddress() + { + return (InetSocketAddress)_channel.socket().getLocalSocketAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() + { + return null; + } + + @Override + public boolean isOpen() + { + return _channel.isOpen(); + } + + @Override + protected void doShutdownOutput() + { + } + + @Override + public void doClose() + { + if (LOG.isDebugEnabled()) + LOG.debug("doClose {}", this); + try + { + _channel.close(); + } + catch (IOException e) + { + LOG.debug("Unable to close channel", e); + } + finally + { + super.doClose(); + } + } + + @Override + public void onClose(Throwable cause) + { + try + { + super.onClose(cause); + } + finally + { + if (_selector != null) + _selector.destroyEndPoint(this, cause); + } + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + if (isInputShutdown()) + return -1; + + int pos = BufferUtil.flipToFill(buffer); + InetSocketAddress peer = (InetSocketAddress)_channel.receive(buffer); + if (peer == null) + { + BufferUtil.flipToFlush(buffer, pos); + return 0; + } + INET_ADDRESS_ARGUMENT.push(peer); + + notIdle(); + BufferUtil.flipToFlush(buffer, pos); + int filled = buffer.remaining(); + if (LOG.isDebugEnabled()) + LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer)); + return filled; + } + + @Override + public boolean flush(ByteBuffer... buffers) throws IOException + { + boolean flushedAll = true; + long flushed = 0; + try + { + InetSocketAddress peer = INET_ADDRESS_ARGUMENT.pop(); + if (LOG.isDebugEnabled()) + LOG.debug("flushing {} buffer(s) to {}", buffers.length - 1, peer); + for (ByteBuffer buffer : buffers) + { + int sent = _channel.send(buffer, peer); + if (sent == 0) + { + flushedAll = false; + break; + } + flushed += sent; + } + if (LOG.isDebugEnabled()) + LOG.debug("flushed {} byte(s), all flushed? {} - {}", flushed, flushedAll, this); + } + catch (IOException e) + { + throw new EofException(e); + } + + if (flushed > 0) + notIdle(); + + return flushedAll; + } + + public DatagramChannel getChannel() + { + return _channel; + } + + @Override + public Object getTransport() + { + return _channel; + } + + @Override + protected void needsFillInterest() + { + changeInterests(SelectionKey.OP_READ); + } + + @Override + protected void onIncompleteFlush() + { + changeInterests(SelectionKey.OP_WRITE); + } + + @Override + public Runnable onSelected() + { + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). + + int readyOps = _key.readyOps(); + int oldInterestOps; + int newInterestOps; + try (AutoLock l = _lock.lock()) + { + _updatePending = true; + // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps & ~readyOps; + _desiredInterestOps = newInterestOps; + } + + boolean fillable = (readyOps & SelectionKey.OP_READ) != 0; + boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0; + + if (LOG.isDebugEnabled()) + LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this); + + // return task to complete the job + Runnable task = fillable + ? (flushable + ? _runCompleteWriteFillable + : _runFillable) + : (flushable + ? _runCompleteWrite + : null); + + if (LOG.isDebugEnabled()) + LOG.debug("task {}", task); + return task; + } + + private void updateKeyAction(Selector selector) + { + updateKey(); + } + + @Override + public void updateKey() + { + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). + + try + { + int oldInterestOps; + int newInterestOps; + try (AutoLock l = _lock.lock()) + { + _updatePending = false; + oldInterestOps = _currentInterestOps; + newInterestOps = _desiredInterestOps; + if (oldInterestOps != newInterestOps) + { + _currentInterestOps = newInterestOps; + _key.interestOps(newInterestOps); + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this); + } + catch (CancelledKeyException x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring key update for cancelled key {}", this, x); + close(); + } + catch (Throwable x) + { + LOG.warn("Ignoring key update for {}", this, x); + close(); + } + } + + @Override + public void replaceKey(SelectionKey newKey) + { + _key = newKey; + } + + private void changeInterests(int operation) + { + // This method runs from any thread, possibly + // concurrently with updateKey() and onSelected(). + + int oldInterestOps; + int newInterestOps; + boolean pending; + try (AutoLock l = _lock.lock()) + { + pending = _updatePending; + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps | operation; + if (newInterestOps != oldInterestOps) + _desiredInterestOps = newInterestOps; + } + + if (LOG.isDebugEnabled()) + LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); + + if (!pending && _selector != null) + _selector.submit(_updateKeyAction); + } + + @Override + public String toEndPointString() + { + // We do a best effort to print the right toString() and that's it. + return String.format("%s{io=%d/%d,kio=%d,kro=%d}", + super.toEndPointString(), + _currentInterestOps, + _desiredInterestOps, + ManagedSelector.safeInterestOps(_key), + ManagedSelector.safeReadyOps(_key)); + } + + public final static class InetAddressArgument + { + private final ThreadLocal threadLocal = new ThreadLocal<>(); + + public void push(InetSocketAddress inetSocketAddress) + { + Objects.requireNonNull(inetSocketAddress); + threadLocal.set(inetSocketAddress); + } + + public InetSocketAddress pop() + { + InetSocketAddress inetSocketAddress = threadLocal.get(); + Objects.requireNonNull(inetSocketAddress); + threadLocal.remove(); + return inetSocketAddress; + } + } +} diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HttpClientTransportOverQuic.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HttpClientTransportOverQuic.java new file mode 100644 index 00000000000..98efa5a8144 --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HttpClientTransportOverQuic.java @@ -0,0 +1,91 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Map; + +import org.eclipse.jetty.client.AbstractHttpClientTransport; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpDestination; +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.MultiplexHttpDestination; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.http.HttpClientConnectionFactory; +import org.eclipse.jetty.io.ClientConnectionFactory; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.annotation.ManagedObject; + +@ManagedObject("The QUIC client transport") +public class HttpClientTransportOverQuic extends AbstractHttpClientTransport +{ + private final ClientConnectionFactory connectionFactory = new HttpClientConnectionFactory(); + private final QuicClientConnector connector; + private final Origin.Protocol protocol; + + public HttpClientTransportOverQuic() + { + this("http/1.1"); + } + + public HttpClientTransportOverQuic(String... alpnProtocols) + { + //TODO the Protocol instance should be passed around instead of the alpn string array + connector = new QuicClientConnector(alpnProtocols); + protocol = new Origin.Protocol(Arrays.asList(alpnProtocols), false); + addBean(connector); + setConnectionPoolFactory(destination -> + { + HttpClient httpClient = getHttpClient(); + int maxConnections = httpClient.getMaxConnectionsPerDestination(); + return new MultiplexConnectionPool(destination, maxConnections, destination, httpClient.getMaxRequestsQueuedPerDestination()); + }); + } + + @Override + public Connection newConnection(EndPoint endPoint, Map context) throws IOException + { + endPoint.setIdleTimeout(getHttpClient().getIdleTimeout()); + return connectionFactory.newConnection(endPoint, context); + } + + @Override + public Origin newOrigin(HttpRequest request) + { + return getHttpClient().createOrigin(request, protocol); + } + + @Override + public HttpDestination newHttpDestination(Origin origin) + { + return new MultiplexHttpDestination(getHttpClient(), origin); + } + + @Override + public void connect(InetSocketAddress address, Map context) + { + HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY); + context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory()); + @SuppressWarnings("unchecked") + Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY); + context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, Promise.from(ioConnection -> {}, promise::failed)); + connector.connect(address, context); + } +} diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicClientConnector.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicClientConnector.java new file mode 100644 index 00000000000..026ff879200 --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicClientConnector.java @@ -0,0 +1,366 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.StandardSocketOptions; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.DatagramChannel; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; + +import org.eclipse.jetty.http3.quiche.QuicheConfig; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.IClientConnector; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.eclipse.jetty.util.thread.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuicClientConnector extends ContainerLifeCycle implements IClientConnector +{ + public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector"; + public static final String REMOTE_SOCKET_ADDRESS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".remoteSocketAddress"; + public static final String CLIENT_CONNECTION_FACTORY_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".clientConnectionFactory"; + public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise"; + private static final Logger LOG = LoggerFactory.getLogger(ClientConnector.class); + + private final QuicheConfig quicheConfig; + private Executor executor; + private Scheduler scheduler; + private ByteBufferPool byteBufferPool; + private SslContextFactory.Client sslContextFactory; + private ClientDatagramSelectorManager selectorManager; + private int selectors = 1; + private Duration connectTimeout = Duration.ofSeconds(5); + private Duration idleTimeout = Duration.ofSeconds(30); + private SocketAddress bindAddress; + private boolean reuseAddress = true; + + public QuicClientConnector(String... alpnProtocol) + { + quicheConfig = new QuicheConfig(); + quicheConfig.setApplicationProtos(alpnProtocol); + quicheConfig.setMaxIdleTimeout(5000L); + quicheConfig.setInitialMaxData(10000000L); + quicheConfig.setInitialMaxStreamDataBidiLocal(10000000L); + quicheConfig.setInitialMaxStreamDataUni(10000000L); + quicheConfig.setInitialMaxStreamsBidi(100L); + quicheConfig.setInitialMaxStreamsUni(100L); + quicheConfig.setDisableActiveMigration(true); + quicheConfig.setVerifyPeer(false); + } + + public Executor getExecutor() + { + return executor; + } + + public void setExecutor(Executor executor) + { + if (isStarted()) + throw new IllegalStateException(); + updateBean(this.executor, executor); + this.executor = executor; + } + + public Scheduler getScheduler() + { + return scheduler; + } + + public void setScheduler(Scheduler scheduler) + { + if (isStarted()) + throw new IllegalStateException(); + updateBean(this.scheduler, scheduler); + this.scheduler = scheduler; + } + + public ByteBufferPool getByteBufferPool() + { + return byteBufferPool; + } + + public void setByteBufferPool(ByteBufferPool byteBufferPool) + { + if (isStarted()) + throw new IllegalStateException(); + updateBean(this.byteBufferPool, byteBufferPool); + this.byteBufferPool = byteBufferPool; + } + + public SslContextFactory.Client getSslContextFactory() + { + return sslContextFactory; + } + + public void setSslContextFactory(SslContextFactory.Client sslContextFactory) + { + if (isStarted()) + throw new IllegalStateException(); + updateBean(this.sslContextFactory, sslContextFactory); + this.sslContextFactory = sslContextFactory; + } + + public int getSelectors() + { + return selectors; + } + + public void setSelectors(int selectors) + { + if (isStarted()) + throw new IllegalStateException(); + this.selectors = selectors; + } + + public boolean isConnectBlocking() + { + return false; + } + + public void setConnectBlocking(boolean connectBlocking) + { + } + + public Duration getConnectTimeout() + { + return connectTimeout; + } + + public void setConnectTimeout(Duration connectTimeout) + { + this.connectTimeout = connectTimeout; + if (selectorManager != null) + selectorManager.setConnectTimeout(connectTimeout.toMillis()); + } + + public Duration getIdleTimeout() + { + return idleTimeout; + } + + public void setIdleTimeout(Duration idleTimeout) + { + this.idleTimeout = idleTimeout; + } + + public SocketAddress getBindAddress() + { + return bindAddress; + } + + public void setBindAddress(SocketAddress bindAddress) + { + this.bindAddress = bindAddress; + } + + public boolean getReuseAddress() + { + return reuseAddress; + } + + public void setReuseAddress(boolean reuseAddress) + { + this.reuseAddress = reuseAddress; + } + + @Override + protected void doStart() throws Exception + { + if (executor == null) + { + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName(String.format("client-pool@%x", hashCode())); + setExecutor(clientThreads); + } + if (scheduler == null) + setScheduler(new ScheduledExecutorScheduler(String.format("client-scheduler@%x", hashCode()), false)); + if (byteBufferPool == null) + setByteBufferPool(new MappedByteBufferPool()); + if (sslContextFactory == null) + setSslContextFactory(newSslContextFactory()); + selectorManager = newSelectorManager(); + selectorManager.setConnectTimeout(getConnectTimeout().toMillis()); + addBean(selectorManager); + super.doStart(); + } + + @Override + protected void doStop() throws Exception + { + super.doStop(); + removeBean(selectorManager); + } + + protected SslContextFactory.Client newSslContextFactory() + { + SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(false); + sslContextFactory.setEndpointIdentificationAlgorithm("HTTPS"); + return sslContextFactory; + } + + protected ClientDatagramSelectorManager newSelectorManager() + { + return new ClientDatagramSelectorManager(getExecutor(), getScheduler(), getSelectors()); + } + + public void connect(SocketAddress address, Map context) + { + DatagramChannel channel = null; + try + { + if (context == null) + context = new HashMap<>(); + context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this); + context.putIfAbsent(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY, address); + + channel = DatagramChannel.open(); + SocketAddress bindAddress = getBindAddress(); + if (bindAddress != null) + { + boolean reuseAddress = getReuseAddress(); + if (LOG.isDebugEnabled()) + LOG.debug("Binding to {} to connect to {}{}", bindAddress, address, (reuseAddress ? " reusing address" : "")); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); + channel.bind(bindAddress); + } + configure(channel); + + if (LOG.isDebugEnabled()) + LOG.debug("Connecting to {}", address); + channel.configureBlocking(false); + + selectorManager.connect(channel, context); + } + // Must catch all exceptions, since some like + // UnresolvedAddressException are not IOExceptions. + catch (Throwable x) + { + // If IPv6 is not deployed, a generic SocketException "Network is unreachable" + // exception is being thrown, so we attempt to provide a better error message. + if (x.getClass() == SocketException.class) + x = new SocketException("Could not connect to " + address).initCause(x); + IO.close(channel); + connectFailed(x, context); + } + } + + public void accept(DatagramChannel channel, Map context) + { + try + { + context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this); + if (!channel.isConnected()) + throw new IllegalStateException("DatagramChannel must be connected"); + configure(channel); + channel.configureBlocking(false); + selectorManager.accept(channel, context); + } + catch (Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not accept {}", channel); + IO.close(channel); + Promise promise = (Promise)context.get(CONNECTION_PROMISE_CONTEXT_KEY); + if (promise != null) + promise.failed(failure); + } + } + + protected void configure(DatagramChannel channel) throws IOException + { + } + + protected EndPoint newEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey selectionKey) + { + return new ClientDatagramEndPoint(channel, selector, selectionKey, getScheduler()); + } + + protected void connectFailed(Throwable failure, Map context) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not connect to {}", context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY)); + Promise promise = (Promise)context.get(CONNECTION_PROMISE_CONTEXT_KEY); + if (promise != null) + promise.failed(failure); + } + + protected class ClientDatagramSelectorManager extends SelectorManager + { + public ClientDatagramSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + super(executor, scheduler, selectors); + } + + @Override + public void connect(SelectableChannel channel, Object attachment) + { + throw new UnsupportedOperationException("TODO"); + } + + @Override + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) + { + EndPoint endPoint = QuicClientConnector.this.newEndPoint((DatagramChannel)channel, selector, selectionKey); + endPoint.setIdleTimeout(getIdleTimeout().toMillis()); + return endPoint; + } + + @Override + public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException + { + @SuppressWarnings("unchecked") + Map context = (Map)attachment; + return new QuicConnection(executor, scheduler, byteBufferPool, endPoint, context, quicheConfig); + } + + @Override + public void connectionOpened(Connection connection, Object context) + { + super.connectionOpened(connection, context); + @SuppressWarnings("unchecked") + Map contextMap = (Map)context; + @SuppressWarnings("unchecked") + Promise promise = (Promise)contextMap.get(CONNECTION_PROMISE_CONTEXT_KEY); + if (promise != null) + promise.succeeded(connection); + } + + @Override + protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment) + { + @SuppressWarnings("unchecked") + Map context = (Map)attachment; + connectFailed(failure, context); + } + } +} diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicConnection.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicConnection.java new file mode 100644 index 00000000000..81c3e2f9355 --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicConnection.java @@ -0,0 +1,237 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +import org.eclipse.jetty.http3.quiche.QuicheConfig; +import org.eclipse.jetty.http3.quiche.QuicheConnection; +import org.eclipse.jetty.http3.quiche.QuicheConnectionId; +import org.eclipse.jetty.http3.quiche.ffi.LibQuiche; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuicConnection extends AbstractConnection +{ + private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class); + + private final ConcurrentMap sessions = new ConcurrentHashMap<>(); + private final QuicheConfig quicheConfig; + private final ByteBufferPool byteBufferPool; + private final Flusher flusher = new Flusher(); + private final Scheduler scheduler; + private final Map context; + + public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map context, QuicheConfig quicheConfig) + { + super(endPoint, executor); + this.quicheConfig = quicheConfig; + this.scheduler = scheduler; + this.byteBufferPool = byteBufferPool; + this.context = context; + } + + void onClose(QuicheConnectionId quicheConnectionId) + { + sessions.remove(quicheConnectionId); + } + + @Override + public void close() + { + sessions.values().forEach(QuicSession::close); + super.close(); + } + + @Override + public void onOpen() + { + super.onOpen(); + fillInterested(); + } + + @Override + public void onFillable() + { + try + { + ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true); + while (true) + { + BufferUtil.clear(cipherBuffer); + int fill = getEndPoint().fill(cipherBuffer); + if (LOG.isDebugEnabled()) + LOG.debug("filled cipher buffer with {} byte(s)", fill); + // ServerDatagramEndPoint will only return -1 if input is shut down. + if (fill < 0) + { + byteBufferPool.release(cipherBuffer); + getEndPoint().shutdownOutput(); + return; + } + if (fill == 0) + { + byteBufferPool.release(cipherBuffer); + fillInterested(); + return; + } + + InetSocketAddress remoteAddress = ClientDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop(); + if (LOG.isDebugEnabled()) + LOG.debug("decoded peer IP address: {}, ciphertext packet size: {}", remoteAddress, cipherBuffer.remaining()); + + QuicheConnectionId quicheConnectionId = QuicheConnectionId.fromPacket(cipherBuffer); + if (quicheConnectionId == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("packet contains undecipherable connection ID, dropping it"); + continue; + } + if (LOG.isDebugEnabled()) + LOG.debug("packet contains connection ID {}", quicheConnectionId); + + QuicSession session = sessions.get(quicheConnectionId); + if (session == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("no existing session with connection ID {}, trying to accept new QUIC connection", quicheConnectionId); + QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, remoteAddress, cipherBuffer); + if (quicheConnection == null) + { + ByteBuffer negotiationBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true); + int pos = BufferUtil.flipToFill(negotiationBuffer); + if (!QuicheConnection.negotiate(remoteAddress, cipherBuffer, negotiationBuffer)) + { + if (LOG.isDebugEnabled()) + LOG.debug("QUIC connection negotiation failed, dropping packet"); + byteBufferPool.release(negotiationBuffer); + continue; + } + BufferUtil.flipToFlush(negotiationBuffer, pos); + + ClientDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(remoteAddress); + getEndPoint().write(Callback.from(() -> byteBufferPool.release(negotiationBuffer)), negotiationBuffer); + if (LOG.isDebugEnabled()) + LOG.debug("QUIC connection negotiation packet sent"); + } + else + { + session = new QuicSession(getExecutor(), scheduler, byteBufferPool, context, quicheConnectionId, quicheConnection, this, remoteAddress); + sessions.putIfAbsent(quicheConnectionId, session); + session.flush(); // send the response packet(s) that accept generated. + if (LOG.isDebugEnabled()) + LOG.debug("created QUIC session {} with connection ID {}", session, quicheConnectionId); + } + + // Once here, cipherBuffer has been fully consumed. + continue; + } + + if (LOG.isDebugEnabled()) + LOG.debug("packet is for existing session with connection ID {}, processing it ({} byte(s))", quicheConnectionId, cipherBuffer.remaining()); + session.process(remoteAddress, cipherBuffer); + } + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("caught exception in onFillable loop", x); + close(); + } + } + + public void write(Callback callback, InetSocketAddress remoteAddress, ByteBuffer... buffers) + { + flusher.offer(callback, remoteAddress, buffers); + flusher.iterate(); + } + + private class Flusher extends IteratingCallback + { + private final AutoLock lock = new AutoLock(); + private final ArrayDeque queue = new ArrayDeque<>(); + private Entry entry; + + public void offer(Callback callback, InetSocketAddress address, ByteBuffer[] buffers) + { + try (AutoLock l = lock.lock()) + { + queue.offer(new Entry(callback, address, buffers)); + } + } + + @Override + protected Action process() + { + try (AutoLock l = lock.lock()) + { + entry = queue.poll(); + } + if (entry == null) + return Action.IDLE; + + ClientDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address); + getEndPoint().write(this, entry.buffers); + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + entry.callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable x) + { + entry.callback.failed(x); + super.failed(x); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + QuicConnection.this.close(); + } + + private class Entry + { + private final Callback callback; + private final InetSocketAddress address; + private final ByteBuffer[] buffers; + + private Entry(Callback callback, InetSocketAddress address, ByteBuffer[] buffers) + { + this.callback = callback; + this.address = address; + this.buffers = buffers; + } + } + } +} diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicSession.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicSession.java new file mode 100644 index 00000000000..3247ad88264 --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicSession.java @@ -0,0 +1,327 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http3.quiche.QuicheConnection; +import org.eclipse.jetty.http3.quiche.QuicheConnectionId; +import org.eclipse.jetty.http3.quiche.ffi.LibQuiche; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ClientConnectionFactory; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.CyclicTimeout; +import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuicSession +{ + private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class); + + private final Flusher flusher; + private final Scheduler scheduler; + private final ByteBufferPool byteBufferPool; + private final Map context; + private final QuicheConnectionId quicheConnectionId; + private final QuicheConnection quicheConnection; + private final QuicConnection connection; + private final ConcurrentMap endpoints = new ConcurrentHashMap<>(); + private final ExecutionStrategy strategy; + private final AutoLock strategyQueueLock = new AutoLock(); + private final Queue strategyQueue = new ArrayDeque<>(); + private InetSocketAddress remoteAddress; + + QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, Map context, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress) + { + this.scheduler = scheduler; + this.byteBufferPool = byteBufferPool; + this.context = context; + this.quicheConnectionId = quicheConnectionId; + this.quicheConnection = quicheConnection; + this.connection = connection; + this.remoteAddress = remoteAddress; + this.flusher = new Flusher(scheduler); + this.strategy = new EatWhatYouKill(() -> + { + try (AutoLock l = strategyQueueLock.lock()) + { + return strategyQueue.poll(); + } + }, executor); + LifeCycle.start(strategy); + } + + public int fill(long streamId, ByteBuffer buffer) throws IOException + { + return quicheConnection.drainClearTextForStream(streamId, buffer); + } + + public int flush(long streamId, ByteBuffer buffer) throws IOException + { + int flushed = quicheConnection.feedClearTextForStream(streamId, buffer); + flush(); + return flushed; + } + + public void flushFinished(long streamId) throws IOException + { + quicheConnection.feedFinForStream(streamId); + flush(); + } + + public boolean isFinished(long streamId) + { + return quicheConnection.isStreamFinished(streamId); + } + + public void shutdownInput(long streamId) throws IOException + { + quicheConnection.shutdownStream(streamId, false); + } + + public void shutdownOutput(long streamId) throws IOException + { + quicheConnection.shutdownStream(streamId, true); + } + + public void onClose(long streamId) + { + endpoints.remove(streamId); + } + + InetSocketAddress getLocalAddress() + { + return connection.getEndPoint().getLocalAddress(); + } + + InetSocketAddress getRemoteAddress() + { + return remoteAddress; + } + + void process(InetSocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException + { + this.remoteAddress = remoteAddress; + quicheConnection.feedCipherText(cipherBufferIn); + + if (quicheConnection.isConnectionEstablished()) + { + List writableStreamIds = quicheConnection.writableStreamIds(); + if (LOG.isDebugEnabled()) + LOG.debug("writable stream ids: {}", writableStreamIds); + Runnable onWritable = () -> + { + for (Long writableStreamId : writableStreamIds) + { + onWritable(writableStreamId); + } + }; + dispatch(onWritable); + + List readableStreamIds = quicheConnection.readableStreamIds(); + if (LOG.isDebugEnabled()) + LOG.debug("readable stream ids: {}", readableStreamIds); + for (Long readableStreamId : readableStreamIds) + { + Runnable onReadable = () -> onReadable(readableStreamId); + dispatch(onReadable); + } + } + else + { + flush(); + } + } + + private void onWritable(long writableStreamId) + { + QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(writableStreamId); + if (LOG.isDebugEnabled()) + LOG.debug("selected endpoint for write: {}", streamEndPoint); + streamEndPoint.onWritable(); + } + + private void onReadable(long readableStreamId) + { + QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId); + if (LOG.isDebugEnabled()) + LOG.debug("selected endpoint for read: {}", streamEndPoint); + streamEndPoint.onReadable(); + } + + private void dispatch(Runnable runnable) + { + try (AutoLock l = strategyQueueLock.lock()) + { + strategyQueue.offer(runnable); + } + strategy.dispatch(); + } + + void flush() + { + flusher.iterate(); + } + + private QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId) + { + QuicStreamEndPoint endPoint = endpoints.compute(streamId, (sid, quicStreamEndPoint) -> + { + if (quicStreamEndPoint == null) + { + quicStreamEndPoint = createQuicStreamEndPoint(streamId); + if (LOG.isDebugEnabled()) + LOG.debug("creating endpoint for stream {}", sid); + } + return quicStreamEndPoint; + }); + if (LOG.isDebugEnabled()) + LOG.debug("returning endpoint for stream {}", streamId); + return endPoint; + } + + private QuicStreamEndPoint createQuicStreamEndPoint(long streamId) + { + ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(QuicClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY); + QuicStreamEndPoint endPoint = new QuicStreamEndPoint(scheduler, this, streamId); + try + { + Connection connection = connectionFactory.newConnection(endPoint, context); + endPoint.setConnection(connection); + endPoint.onOpen(); + connection.onOpen(); + return endPoint; + } + catch (IOException e) + { + throw new RuntimeIOException("Error creating new connection", e); + } + } + + public void close() + { + if (LOG.isDebugEnabled()) + LOG.debug("closing QUIC session {}", this); + try + { + endpoints.values().forEach(AbstractEndPoint::close); + endpoints.clear(); + flusher.close(); + connection.onClose(quicheConnectionId); + LifeCycle.stop(strategy); + } + finally + { + // This call frees malloc'ed memory so make sure it always happens. + quicheConnection.dispose(); + } + if (LOG.isDebugEnabled()) + LOG.debug("closed QUIC session {}", this); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + " id=" + quicheConnectionId; + } + + private class Flusher extends IteratingCallback + { + private final CyclicTimeout timeout; + private ByteBuffer cipherBuffer; + + public Flusher(Scheduler scheduler) + { + timeout = new CyclicTimeout(scheduler) { + @Override + public void onTimeoutExpired() + { + if (LOG.isDebugEnabled()) + LOG.debug("quiche timeout callback"); + quicheConnection.onTimeout(); + if (LOG.isDebugEnabled()) + LOG.debug("re-iterating quiche after timeout"); + iterate(); + } + }; + } + + @Override + public void close() + { + super.close(); + timeout.destroy(); + } + + @Override + protected Action process() throws IOException + { + cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true); + int pos = BufferUtil.flipToFill(cipherBuffer); + int drained = quicheConnection.drainCipherText(cipherBuffer); + long nextTimeoutInMs = quicheConnection.nextTimeout(); + if (LOG.isDebugEnabled()) + LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs); + if (nextTimeoutInMs < 0) + timeout.cancel(); + else + timeout.schedule(nextTimeoutInMs, TimeUnit.MILLISECONDS); + if (drained == 0) + { + if (quicheConnection.isConnectionClosed()) + { + if (LOG.isDebugEnabled()) + LOG.debug("quiche connection is in closed state"); + QuicSession.this.close(); + } + return Action.IDLE; + } + BufferUtil.flipToFlush(cipherBuffer, pos); + connection.write(this, remoteAddress, cipherBuffer); + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + byteBufferPool.release(cipherBuffer); + super.succeeded(); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + byteBufferPool.release(cipherBuffer); + } + } +} diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicStreamEndPoint.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicStreamEndPoint.java new file mode 100644 index 00000000000..3e8cad865b3 --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/QuicStreamEndPoint.java @@ -0,0 +1,150 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.thread.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuicStreamEndPoint extends AbstractEndPoint +{ + private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class); + + private final QuicSession session; + private final long streamId; + + public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId) + { + super(scheduler); + this.session = session; + this.streamId = streamId; + } + + @Override + public InetSocketAddress getLocalAddress() + { + return session.getLocalAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() + { + return session.getRemoteAddress(); + } + + @Override + protected void doShutdownInput() + { + try + { + session.shutdownInput(streamId); + } + catch (IOException x) + { + if (LOG.isDebugEnabled()) + LOG.debug("error shutting down output", x); + } + } + + @Override + protected void doShutdownOutput() + { + try + { + session.shutdownOutput(streamId); + } + catch (IOException x) + { + if (LOG.isDebugEnabled()) + LOG.debug("error shutting down output", x); + } + } + + @Override + public void onClose(Throwable failure) + { + try + { + session.flushFinished(streamId); + } + catch (IOException e) + { + if (LOG.isDebugEnabled()) + LOG.debug("Error sending FIN on stream {}", streamId, e); + } + super.onClose(failure); + session.onClose(streamId); + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + int pos = BufferUtil.flipToFill(buffer); + int drained = session.fill(streamId, buffer); + BufferUtil.flipToFlush(buffer, pos); + if (session.isFinished(streamId)) + shutdownInput(); + return drained; + } + + @Override + public boolean flush(ByteBuffer... buffers) throws IOException + { + for (ByteBuffer buffer : buffers) + { + int flushed = session.flush(streamId, buffer); + if (LOG.isDebugEnabled()) + LOG.debug("flushed {} bytes to stream {}; buffer has remaining? {}", flushed, streamId, buffer.hasRemaining()); + if (buffer.hasRemaining()) + return false; + } + return true; + } + + @Override + public Object getTransport() + { + return session; + } + + public void onWritable() + { + getWriteFlusher().completeWrite(); + } + + public void onReadable() + { + getFillInterest().fillable(); + } + + @Override + protected void onIncompleteFlush() + { + // No need to do anything. + // See QuicSession.process(). + } + + @Override + protected void needsFillInterest() + { + // No need to do anything. + // See QuicSession.process(). + } +} diff --git a/jetty-http3/http3-client/src/test/java/org/eclipse/jetty/http3/client/End2EndClientTest.java b/jetty-http3/http3-client/src/test/java/org/eclipse/jetty/http3/client/End2EndClientTest.java new file mode 100644 index 00000000000..9f50276c4c8 --- /dev/null +++ b/jetty-http3/http3-client/src/test/java/org/eclipse/jetty/http3/client/End2EndClientTest.java @@ -0,0 +1,103 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client; + +import java.io.IOException; +import java.io.PrintWriter; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpCompliance; +import org.eclipse.jetty.http3.server.ServerDatagramConnector; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class End2EndClientTest +{ + private Server server; + + @BeforeEach + public void setUp() throws Exception + { + server = new Server(); + + HttpConfiguration config = new HttpConfiguration(); + config.setHttpCompliance(HttpCompliance.LEGACY); // enable HTTP/0.9 + HttpConnectionFactory connectionFactory = new HttpConnectionFactory(config); + + ServerDatagramConnector serverDatagramConnector = new ServerDatagramConnector(server, connectionFactory); + serverDatagramConnector.setPort(8443); + server.addConnector(serverDatagramConnector); + + server.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + PrintWriter writer = response.getWriter(); + writer.println("\n" + + "\t\n" + + "\t\tRequest served\n" + + "\t\n" + + ""); + } + }); + + server.start(); + } + + @AfterEach + public void tearDown() throws Exception + { + server.stop(); + } + + @Test + public void name() throws Exception + { + HttpClientTransportOverQuic transport = new HttpClientTransportOverQuic(); + HttpClient client = new HttpClient(transport); + client.start(); + + ContentResponse response = client.GET("https://localhost:8443/"); + int status = response.getStatus(); + String contentAsString = response.getContentAsString(); + System.out.println("=========="); + System.out.println("Status: " + status); + System.out.println(contentAsString); + System.out.println("=========="); + + client.stop(); + + assertThat(status, is(200)); + assertThat(contentAsString, is("\n" + + "\t\n" + + "\t\tRequest served\n" + + "\t\n" + + "\n")); + } +} diff --git a/jetty-http3/http3-client/src/test/resources/jetty-logging.properties b/jetty-http3/http3-client/src/test/resources/jetty-logging.properties new file mode 100644 index 00000000000..40c7fe7b946 --- /dev/null +++ b/jetty-http3/http3-client/src/test/resources/jetty-logging.properties @@ -0,0 +1,3 @@ +# Jetty Logging using jetty-slf4j-impl +#org.eclipse.jetty.LEVEL=DEBUG +#org.eclipse.jetty.http3.LEVEL=DEBUG diff --git a/jetty-http3/http3-client/src/test/resources/keystore.p12 b/jetty-http3/http3-client/src/test/resources/keystore.p12 new file mode 100644 index 0000000000000000000000000000000000000000..0b56dd34ee9df09c61bd635095485252c86e6df7 GIT binary patch literal 2445 zcmY+EX*3iH8^@UiGh=V4u}mXd)|s&jakJfSb_u!M&@dR;8nV|E8e5Eg--W~=k|mVT zGH8@##JD8V*vYQj`=0l`_uLQ9dCvL$|IhRBhao}80CpgT1i8Qgk&pc~c87ddVD<3_2nYCs@bW} z?geurRi%7;edgcBV6iEpy-NCO{*0)r;KRT)b5lF#V*RkIBIi0uJR3HNJ|B`sCVD8r z0dv@<8+bqdMFLhj9Y4S?GBaUh%h7KLvG?f4$3M6Dxa4AGD_e)?v@x5+ecGR1|8P5H z-ZR&usqoe7YE>h0iUrS_)Sc;k^}s>z%W(->QtR0TCNI~S#wF1Q7gs4?#uF*t0*IX5 zi-Xg3EE$f;NuWTOcP$kr%v6V4$TXQkex@(?B@Ih=4!igIXA|y2?xS$y@hK0Fa zO^@G=TtY_W9nNqEl(L}%;fUUfMXp#$WXF$`4CWl2jo8?9rO97f#5?Ym>}${QYU{IJ zJ^X4omN22Kd8Cxlv@-jP$(+B4KzC_BoP*Dfu`BW+&y)PbA;!Zgmum+3jsCRo+B{VD z_91^#re9JY^rVdc$0YYFbZ;Gn+_S|n(e_dCsax!)#4Q(;HNA&?hM|X=Dw_>Dpfr}J z)jWNAh{3N=HWW>qawSxmZ7QqYaJNY%gjTlq@4{M?pkp0aTKh`AId7KHS8MCzZ;gmd zK5td)wJzKHbQw(NE1yYh+U6rFu3jd~WwyS>+d;7*TlZCpTP(4ByFGnD{LapKs8cd5 z(dmpXb!y1WDO}Ptgk3fjdA71{huqj31pp`a==U%(BMRpw=)}}khjwtd&A87-6O8J( z<(sqbS3s7-$Xx7pk#R!0-?Z<<=9O{cs@t|FxU91vieA1J$HeUi1iio)VE5;YD?tm+ z(UZg{*H8pvflLFTVzkIP#^|w_4SX)9t z?(95|fX`aE=G$876N?7czB4$J6>atJ9Ak!8rG2%31oyj3Q&!xX7&j3p%xiq$NomT~ zJ}O~kxIRnTE78EhKBep2w8s5Sv5r>1F&5tS_%!G3jXz)RjLdG?Y66yG zN&R1(Fz!Uz9LK{h*e{!KXf+8+k<7*%I45JBh5ULr@@E3&8ib zx>KeIvH-?|yIb)0{-Xn18v56?M`_PmB&{Y!7KF~xxHS#M_)YS&#@y|8H_CE0HF9~^cIQK`MuyI8Nk#!*{N6#$Ox_-4A0t4mNXh!ZJymW^$ zTx5q`*V`ufQhKrAT#6M=DSMpjb?)vvD|1_}WDbUC-mO}`413u1n3pCMyl@z2>|U$s zMR?`D`to2RuR;ZsWE5qxa%yLC$sn|w*FXh6>&i@gsgk2UmM`><<0tZ} z)mvTfSyYdMv=>$&QtLU|W7t5@IG@cSk2=)msZ#<+$*(Qcg(DAwii-{?Qa_k8wJ)Um z^Uo$X;U3ST_$xPmsEr={3XD&!;EFQZqa`m)Ru?Uy_1<7O05DutFrnv2bH-Z$8z z^Bcna8m9({Gt9n&9j>a+Xt}2m*f8+iB>T5?SDm9+FX>9md5i=G!U0y`1p-9CY#{jk yGbbrc;oIWY{%(G2sw^p2ZNCN(d>!yEq)t(jm@ztr3OEo{ + + + org.eclipse.jetty.http3 + http3-parent + 10.0.2-SNAPSHOT + + + 4.0.0 + http3-common + Jetty :: HTTP3 :: Common + + + ${project.groupId}.common + + + + + org.slf4j + slf4j-api + + + org.eclipse.jetty.http3 + cloudflare-quiche-jna + ${project.version} + + + + org.eclipse.jetty + jetty-io + ${project.version} + + + + org.eclipse.jetty + jetty-slf4j-impl + test + + + diff --git a/jetty-http3/http3-common/src/main/java/module-info.java b/jetty-http3/http3-common/src/main/java/module-info.java new file mode 100644 index 00000000000..5a31c1ea56e --- /dev/null +++ b/jetty-http3/http3-common/src/main/java/module-info.java @@ -0,0 +1,20 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +module org.eclipse.jetty.http3.common +{ + + requires transitive org.eclipse.jetty.http3.quiche; + requires transitive org.eclipse.jetty.io; + requires org.slf4j; +} diff --git a/jetty-http3/http3-server/pom.xml b/jetty-http3/http3-server/pom.xml index 4f4d6186734..9bd54b468eb 100644 --- a/jetty-http3/http3-server/pom.xml +++ b/jetty-http3/http3-server/pom.xml @@ -26,7 +26,7 @@ org.eclipse.jetty.http3 - cloudflare-quiche-jna + http3-common ${project.version} diff --git a/jetty-http3/pom.xml b/jetty-http3/pom.xml index 1ce48af2393..341925df873 100644 --- a/jetty-http3/pom.xml +++ b/jetty-http3/pom.xml @@ -14,7 +14,9 @@ cloudflare-quiche-jna + http3-common http3-server + http3-client