get rid of promoteSession()

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-31 16:16:07 +02:00 committed by Simone Bordet
parent 2f3587ef6d
commit 4b6bcca529
54 changed files with 669 additions and 1273 deletions

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.client;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -140,11 +142,13 @@ public class Origin
{
private final String host;
private final int port;
private final SocketAddress address;
public Address(String host, int port)
{
this.host = HostPort.normalizeHost(Objects.requireNonNull(host));
this.port = port;
this.address = InetSocketAddress.createUnresolved(getHost(), getPort());
}
public String getHost()
@ -179,6 +183,11 @@ public class Origin
return String.format("%s:%d", host, port);
}
public SocketAddress getSocketAddress()
{
return address;
}
@Override
public String toString()
{

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.client.dynamic;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -179,7 +180,8 @@ public class HttpClientTransportDynamic extends AbstractConnectorHttpClientTrans
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new MultiplexHttpDestination(getHttpClient(), origin);
SocketAddress address = origin.getAddress().getSocketAddress();
return new MultiplexHttpDestination(getHttpClient(), origin, getClientConnector().isIntrinsicallySecure(address));
}
@Override
@ -195,7 +197,9 @@ public class HttpClientTransportDynamic extends AbstractConnectorHttpClientTrans
}
else
{
if (destination.isSecure() && protocol.isNegotiate())
SocketAddress address = destination.getOrigin().getAddress().getSocketAddress();
boolean intrinsicallySecure = getClientConnector().isIntrinsicallySecure(address);
if (!intrinsicallySecure && destination.isSecure() && protocol.isNegotiate())
{
factory = new ALPNClientConnectionFactory(getClientConnector().getExecutor(), this::newNegotiatedConnection, protocol.getProtocols());
}

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
@ -68,7 +69,8 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new DuplexHttpDestination(getHttpClient(), origin, getClientConnector().isIntrinsicallySecure());
SocketAddress address = origin.getAddress().getSocketAddress();
return new DuplexHttpDestination(getHttpClient(), origin, getClientConnector().isIntrinsicallySecure(address));
}
@Override

View File

@ -193,7 +193,7 @@ public class HttpSenderOverHTTP extends HttpSender
}
case FLUSH:
{
final EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
if (headerBuffer == null)
headerBuffer = BufferUtil.EMPTY_BUFFER;
if (chunkBuffer == null)
@ -202,6 +202,7 @@ public class HttpSenderOverHTTP extends HttpSender
contentBuffer = BufferUtil.EMPTY_BUFFER;
long bytes = headerBuffer.remaining() + chunkBuffer.remaining() + contentBuffer.remaining();
getHttpChannel().getHttpConnection().addBytesOut(bytes);
// TODO: see notes at QuicSession.onOpen().
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
generated = true;
return Action.SCHEDULED;

View File

@ -117,7 +117,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new MultiplexHttpDestination(getHttpClient(), origin);
SocketAddress address = origin.getAddress().getSocketAddress();
return new MultiplexHttpDestination(getHttpClient(), origin, getHTTP2Client().getClientConnector().isIntrinsicallySecure(address));
}
@Override

View File

@ -1,260 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client;
import java.io.Closeable;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.StandardSocketOptions;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QuicClientConnector extends ClientConnector
{
private static final Logger LOG = LoggerFactory.getLogger(QuicClientConnector.class);
@Override
public boolean isConnectBlocking()
{
return false;
}
@Override
public void setConnectBlocking(boolean connectBlocking)
{
}
@Override
public boolean isIntrinsicallySecure()
{
return true;
}
@Override
protected SelectorManager newSelectorManager()
{
return new QuicSelectorManager(getExecutor(), getScheduler(), 1);
}
@Override
public void connect(SocketAddress address, Map<String, Object> context)
{
DatagramChannel channel = null;
try
{
SelectorManager selectorManager = getBean(SelectorManager.class);
if (context == null)
context = new HashMap<>();
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
context.putIfAbsent(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY, address);
channel = DatagramChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
{
boolean reuseAddress = getReuseAddress();
if (LOG.isDebugEnabled())
LOG.debug("Binding to {} to connect to {}{}", bindAddress, address, (reuseAddress ? " reusing address" : ""));
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
channel.bind(bindAddress);
}
configure(channel);
if (LOG.isDebugEnabled())
LOG.debug("Connecting to {}", address);
channel.configureBlocking(false);
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
IO.close(channel);
connectFailed(x, context);
}
}
@Override
public void accept(SelectableChannel selectableChannel, Map<String, Object> context)
{
throw new UnsupportedOperationException();
}
@Override
protected void configure(SelectableChannel selectableChannel)
{
}
@Override
protected EndPoint newEndPoint(SelectableChannel selectableChannel, ManagedSelector selector, SelectionKey selectionKey)
{
return new QuicDatagramEndPoint((DatagramChannel)selectableChannel, selector, selectionKey, getScheduler());
}
protected class QuicSelectorManager extends SelectorManager
{
public QuicSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
@Override
public void connect(SelectableChannel channel, Object attachment)
{
ManagedSelector managedSelector = chooseSelector();
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
managedSelector.submit(new Connect(channel, context));
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
EndPoint endPoint = QuicClientConnector.this.newEndPoint(channel, selector, selectionKey);
endPoint.setIdleTimeout(getIdleTimeout().toMillis());
return endPoint;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment)
{
Connect connect = (Connect)attachment;
Map<String, Object> contextMap = connect.getContext();
return new ClientQuicConnection(getExecutor(), getScheduler(), getByteBufferPool(), endPoint, contextMap);
}
@Override
public void connectionOpened(Connection connection, Object attachment)
{
super.connectionOpened(connection, attachment);
Connect connect = (Connect)attachment;
Map<String, Object> contextMap = connect.getContext();
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)contextMap.get(CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.succeeded(connection);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
{
Connect connect = (Connect)attachment;
Map<String, Object> contextMap = connect.getContext();
connectFailed(failure, contextMap);
}
class Connect implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Runnable, Closeable
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SelectableChannel channel;
private final Map<String, Object> context;
private volatile SelectionKey key;
Connect(SelectableChannel channel, Map<String, Object> context)
{
this.channel = channel;
this.context = context;
}
public Map<String, Object> getContext()
{
return context;
}
@Override
public void update(Selector selector)
{
try
{
key = channel.register(selector, SelectionKey.OP_WRITE, this);
}
catch (Throwable x)
{
failed(x);
}
}
@Override
public Runnable onSelected()
{
key.interestOps(0);
return this;
}
@Override
public void run()
{
try
{
chooseSelector().createEndPoint(channel, key);
}
catch (Throwable x)
{
failed(x);
}
}
@Override
public void updateKey()
{
}
@Override
public void replaceKey(SelectionKey newKey)
{
key = newKey;
}
@Override
public void close()
{
// May be called from any thread.
// Implements AbstractConnector.setAccepting(boolean).
chooseSelector().submit(selector -> key.cancel());
}
private void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
{
IO.close(channel);
connectFailed(failure, context);
}
}
}
}
}

View File

@ -1,163 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http3.server.ServerQuicConnector;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
public class End2EndClientTest
{
private Server server;
private HttpClient client;
@BeforeEach
public void setUp() throws Exception
{
server = new Server();
HttpConnectionFactory connectionFactory = new HttpConnectionFactory();
ServerQuicConnector connector = new ServerQuicConnector(server, connectionFactory);
connector.setPort(8443);
server.addConnector(connector);
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
PrintWriter writer = response.getWriter();
writer.println("<html>\n" +
"\t<body>\n" +
"\t\tRequest served\n" +
"\t</body>\n" +
"</html>");
}
});
server.start();
HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP(new QuicClientConnector());
client = new HttpClient(transport);
client.start();
}
@AfterEach
public void tearDown()
{
LifeCycle.stop(server);
LifeCycle.stop(client);
}
@Test
public void simple() throws Exception
{
ContentResponse response = client.GET("https://localhost:8443/");
int status = response.getStatus();
String contentAsString = response.getContentAsString();
System.out.println("==========");
System.out.println("Status: " + status);
System.out.println(contentAsString);
System.out.println("==========");
assertThat(status, is(200));
assertThat(contentAsString, is("<html>\n" +
"\t<body>\n" +
"\t\tRequest served\n" +
"\t</body>\n" +
"</html>\n"));
}
@Test
public void multiple() throws Exception
{
for (int i = 0; i < 1000; i++)
{
ContentResponse response = client.GET("https://localhost:8443/");
int status = response.getStatus();
String contentAsString = response.getContentAsString();
assertThat(status, is(200));
assertThat(contentAsString, is("<html>\n" +
"\t<body>\n" +
"\t\tRequest served\n" +
"\t</body>\n" +
"</html>\n"));
}
}
@Test
public void multiThreaded() throws Exception
{
ExecutorService executor = Executors.newFixedThreadPool(2);
try
{
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++)
{
futures.add(executor.submit(() ->
{
try
{
ContentResponse response = client.GET("https://localhost:8443/");
int status = response.getStatus();
String contentAsString = response.getContentAsString();
assertThat(status, is(200));
assertThat(contentAsString, is("<html>\n" +
"\t<body>\n" +
"\t\tRequest served\n" +
"\t</body>\n" +
"</html>\n"));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}));
}
for (Future<?> future : futures)
{
future.get();
}
}
finally
{
executor.shutdownNow();
}
}
}

View File

@ -1,3 +0,0 @@
# Jetty Logging using jetty-slf4j-impl
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.http3.LEVEL=DEBUG

View File

@ -1,444 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.common;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Objects;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QuicDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
{
private static final Logger LOG = LoggerFactory.getLogger(QuicDatagramEndPoint.class);
/**
* {@link #fill(ByteBuffer)} needs to pass the {@link InetSocketAddress} together with the buffer
* and {@link #flush(ByteBuffer...)} needs the {@link InetSocketAddress} passed together with the buffer.
* Since we cannot change the {@link org.eclipse.jetty.io.EndPoint} API, the {@link InetSocketAddress}
* argument must be passed on the side with this thread-local.
*
* Note: a first implementation was encoding the InetSocketAddress in the buffer(s) but this was as complex
* and required a mildly expensive encode-decode cycle each time one of those two methods was called.
* This mechanism is as complex and brittle but virtually as cheap as standard argument passing.
*/
public static final InetAddressArgument INET_ADDRESS_ARGUMENT = new InetAddressArgument();
private final AutoLock _lock = new AutoLock();
private final DatagramChannel _channel;
private final ManagedSelector _selector;
private SelectionKey _key;
private boolean _updatePending;
// The current value for interestOps.
private int _currentInterestOps;
// The desired value for interestOps.
private int _desiredInterestOps;
private abstract class RunnableTask implements Runnable, Invocable
{
final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("%s:%s:%s", QuicDatagramEndPoint.this, _operation, getInvocationType());
}
}
private abstract class RunnableCloseable extends RunnableTask implements Closeable
{
protected RunnableCloseable(String op)
{
super(op);
}
@Override
public void close()
{
try
{
QuicDatagramEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", QuicDatagramEndPoint.this, x);
}
}
}
private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
private final Runnable _runFillable = new RunnableCloseable("runFillable")
{
@Override
public InvocationType getInvocationType()
{
return getFillInterest().getCallbackInvocationType();
}
@Override
public void run()
{
getFillInterest().fillable();
}
};
private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite")
{
@Override
public InvocationType getInvocationType()
{
return getWriteFlusher().getCallbackInvocationType();
}
@Override
public void run()
{
getWriteFlusher().completeWrite();
}
@Override
public String toString()
{
return String.format("%s:%s:%s->%s", QuicDatagramEndPoint.this, _operation, getInvocationType(), getWriteFlusher());
}
};
private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable")
{
@Override
public InvocationType getInvocationType()
{
InvocationType fillT = getFillInterest().getCallbackInvocationType();
InvocationType flushT = getWriteFlusher().getCallbackInvocationType();
if (fillT == flushT)
return fillT;
if (fillT == InvocationType.EITHER && flushT == InvocationType.NON_BLOCKING)
return InvocationType.EITHER;
if (fillT == InvocationType.NON_BLOCKING && flushT == InvocationType.EITHER)
return InvocationType.EITHER;
return InvocationType.BLOCKING;
}
@Override
public void run()
{
getWriteFlusher().completeWrite();
getFillInterest().fillable();
}
};
public QuicDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
_channel = channel;
_selector = selector;
_key = key;
}
@Override
public InetSocketAddress getLocalAddress()
{
return (InetSocketAddress)_channel.socket().getLocalSocketAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return null;
}
@Override
public boolean isOpen()
{
return _channel.isOpen();
}
@Override
protected void doShutdownOutput()
{
}
@Override
public void doClose()
{
if (LOG.isDebugEnabled())
LOG.debug("doClose {}", this);
try
{
_channel.close();
}
catch (IOException e)
{
LOG.debug("Unable to close channel", e);
}
finally
{
super.doClose();
}
}
@Override
public void onClose(Throwable cause)
{
try
{
super.onClose(cause);
}
finally
{
if (_selector != null)
_selector.destroyEndPoint(this, cause);
}
}
@Override
public int fill(ByteBuffer buffer) throws IOException
{
if (isInputShutdown())
return -1;
int pos = BufferUtil.flipToFill(buffer);
InetSocketAddress peer = (InetSocketAddress)_channel.receive(buffer);
if (peer == null)
{
BufferUtil.flipToFlush(buffer, pos);
return 0;
}
INET_ADDRESS_ARGUMENT.push(peer);
notIdle();
BufferUtil.flipToFlush(buffer, pos);
int filled = buffer.remaining();
if (LOG.isDebugEnabled())
LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer));
return filled;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
boolean flushedAll = true;
long flushed = 0;
try
{
InetSocketAddress peer = INET_ADDRESS_ARGUMENT.pop();
if (LOG.isDebugEnabled())
LOG.debug("flushing {} buffer(s) to {}", buffers.length, peer);
for (ByteBuffer buffer : buffers)
{
int sent = _channel.send(buffer, peer);
if (sent == 0)
{
flushedAll = false;
break;
}
flushed += sent;
}
if (LOG.isDebugEnabled())
LOG.debug("flushed {} byte(s), all flushed? {} - {}", flushed, flushedAll, this);
}
catch (IOException e)
{
throw new EofException(e);
}
if (flushed > 0)
notIdle();
return flushedAll;
}
@Override
public Object getTransport()
{
return _channel;
}
@Override
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
@Override
public Runnable onSelected()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
private void updateKeyAction(Selector selector)
{
updateKey();
}
@Override
public void updateKey()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
try
{
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
newInterestOps = _desiredInterestOps;
if (oldInterestOps != newInterestOps)
{
_currentInterestOps = newInterestOps;
_key.interestOps(newInterestOps);
}
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
catch (CancelledKeyException x)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring key update for cancelled key {}", this, x);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for {}", this, x);
close();
}
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
private void changeInterests(int operation)
{
// This method runs from any thread, possibly
// concurrently with updateKey() and onSelected().
int oldInterestOps;
int newInterestOps;
boolean pending;
try (AutoLock l = _lock.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_desiredInterestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending && _selector != null)
_selector.submit(_updateKeyAction);
}
@Override
public String toEndPointString()
{
// We do a best effort to print the right toString() and that's it.
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toEndPointString(),
_currentInterestOps,
_desiredInterestOps,
ManagedSelector.safeInterestOps(_key),
ManagedSelector.safeReadyOps(_key));
}
public static final class InetAddressArgument
{
private final ThreadLocal<InetSocketAddress> threadLocal = new ThreadLocal<>();
public void push(InetSocketAddress inetSocketAddress)
{
Objects.requireNonNull(inetSocketAddress);
threadLocal.set(inetSocketAddress);
}
public InetSocketAddress pop()
{
InetSocketAddress inetSocketAddress = threadLocal.get();
Objects.requireNonNull(inetSocketAddress);
threadLocal.remove();
return inetSocketAddress;
}
}
}

View File

@ -3,20 +3,20 @@
<parent>
<artifactId>jetty-project</artifactId>
<groupId>org.eclipse.jetty</groupId>
<version>10.0.2-SNAPSHOT</version>
<version>10.0.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-parent</artifactId>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-parent</artifactId>
<packaging>pom</packaging>
<name>Jetty :: HTTP3</name>
<name>Jetty :: QUIC</name>
<modules>
<module>cloudflare-quiche-jna</module>
<module>http3-common</module>
<module>http3-server</module>
<module>http3-client</module>
<module>quic-quiche</module>
<module>quic-common</module>
<module>quic-server</module>
<module>quic-client</module>
</modules>
</project>

View File

@ -1,14 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-parent</artifactId>
<version>10.0.2-SNAPSHOT</version>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-parent</artifactId>
<version>10.0.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http3-client</artifactId>
<name>Jetty :: HTTP3 :: Client</name>
<artifactId>quic-client</artifactId>
<name>Jetty :: QUIC :: Client</name>
<properties>
<bundle-symbolic-name>${project.groupId}.client</bundle-symbolic-name>
@ -20,8 +20,8 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-common</artifactId>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -31,8 +31,20 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-server</artifactId>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

View File

@ -11,12 +11,12 @@
// ========================================================================
//
module org.eclipse.jetty.http3.client
module org.eclipse.jetty.quic.client
{
exports org.eclipse.jetty.http3.client;
exports org.eclipse.jetty.quic.client;
requires org.eclipse.jetty.http3.common;
requires org.eclipse.jetty.http3.quiche;
requires org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.quic.quiche;
requires org.eclipse.jetty.client;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.util;

View File

@ -11,10 +11,11 @@
// ========================================================================
//
package org.eclipse.jetty.http3.client;
package org.eclipse.jetty.quic.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@ -23,30 +24,37 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.http3.common.QuicConnection;
import org.eclipse.jetty.http3.common.QuicSession;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.quiche.QuicheConfig;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The client specific implementation of {@link QuicConnection}.</p>
* <p>For each {@link ClientConnector#connect(SocketAddress, Map)} operation,
* a new {@link DatagramChannelEndPoint} is created with an associated
* {@code ClientQuicConnection}.</p>
*/
public class ClientQuicConnection extends QuicConnection
{
private static final Logger LOG = LoggerFactory.getLogger(ClientQuicConnection.class);
private final Map<InetSocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<SocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<String, Object> context;
public ClientQuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, Map<String, Object> context)
public ClientQuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map<String, Object> context)
{
super(executor, scheduler, byteBufferPool, endp);
super(executor, scheduler, byteBufferPool, endPoint);
this.context = context;
}
@ -93,7 +101,7 @@ public class ClientQuicConnection extends QuicConnection
{
super.closeSession(quicheConnectionId, session, x);
InetSocketAddress remoteAddress = session.getRemoteAddress();
SocketAddress remoteAddress = session.getRemoteAddress();
if (pendingSessions.remove(remoteAddress) != null)
{
Promise<?> promise = (Promise<?>)context.get(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY);
@ -103,25 +111,19 @@ public class ClientQuicConnection extends QuicConnection
}
@Override
protected QuicSession createSession(InetSocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{
QuicSession session = pendingSessions.get(remoteAddress);
if (session != null)
session.process(remoteAddress, cipherBuffer);
return session;
}
@Override
protected boolean promoteSession(QuicheConnectionId quicheConnectionId, QuicSession session)
{
InetSocketAddress remoteAddress = session.getRemoteAddress();
if (pendingSessions.containsKey(remoteAddress) && session.isConnectionEstablished())
{
pendingSessions.remove(remoteAddress);
session.setConnectionId(quicheConnectionId);
session.createStream(0); // TODO perform proper stream ID generation
return true;
session.process(remoteAddress, cipherBuffer);
if (session.isConnectionEstablished())
{
pendingSessions.remove(remoteAddress);
session.onOpen();
return session;
}
}
return false;
return null;
}
}

View File

@ -11,24 +11,30 @@
// ========================================================================
//
package org.eclipse.jetty.http3.client;
package org.eclipse.jetty.quic.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicConnection;
import org.eclipse.jetty.http3.common.QuicSession;
import org.eclipse.jetty.http3.common.QuicStreamEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>The client specific implementation of {@link QuicSession}.</p>
* <p>When asked to create a QUIC stream, it creates a {@link QuicStreamEndPoint}
* with an associated {@link Connection} created from the {@link ClientConnectionFactory},
* retrieved from the connection context map.</p>
*/
public class ClientQuicSession extends QuicSession
{
private final Map<String, Object> context;
@ -42,20 +48,19 @@ public class ClientQuicSession extends QuicSession
@Override
protected QuicStreamEndPoint createQuicStreamEndPoint(long streamId)
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(getScheduler(), this, streamId);
Connection connection;
try
{
connection = connectionFactory.newConnection(endPoint, context);
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(getScheduler(), this, streamId);
Connection connection = connectionFactory.newConnection(endPoint, context);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
return endPoint;
}
catch (IOException e)
catch (IOException x)
{
throw new RuntimeIOException("Error creating new connection", e);
throw new RuntimeIOException("Error creating new connection", x);
}
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
return endPoint;
}
}

View File

@ -0,0 +1,55 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.quic.client;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Map;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
public class QuicClientConnectorConfigurator extends ClientConnector.Configurator
{
@Override
public boolean isIntrinsicallySecure(ClientConnector clientConnector, SocketAddress address)
{
return true;
}
@Override
public ChannelWithAddress newChannelWithAddress(ClientConnector clientConnector, SocketAddress address, Map<String, Object> context) throws IOException
{
DatagramChannel channel = DatagramChannel.open();
return new ChannelWithAddress(channel, address);
}
@Override
public EndPoint newEndPoint(ClientConnector clientConnector, SocketAddress address, SelectableChannel selectable, ManagedSelector selector, SelectionKey selectionKey)
{
return new DatagramChannelEndPoint((DatagramChannel)selectable, selector, selectionKey, clientConnector.getScheduler());
}
@Override
public Connection newConnection(ClientConnector clientConnector, SocketAddress address, EndPoint endPoint, Map<String, Object> context)
{
return new ClientQuicConnection(clientConnector.getExecutor(), clientConnector.getScheduler(), clientConnector.getByteBufferPool(), endPoint, context);
}
}

View File

@ -0,0 +1,152 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.quic.client;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.CompletableFuture;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
public class End2EndClientTest
{
private Server server;
private ServerQuicConnector connector;
private HttpClient client;
private final String responseContent = "" +
"<html>\n" +
"\t<body>\n" +
"\t\tRequest served\n" +
"\t</body>\n" +
"</html>";
@BeforeEach
public void setUp() throws Exception
{
server = new Server();
HttpConfiguration httpConfiguration = new HttpConfiguration();
HttpConnectionFactory http1 = new HttpConnectionFactory(httpConfiguration);
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(httpConfiguration);
connector = new ServerQuicConnector(server, http1, http2);
server.addConnector(connector);
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
PrintWriter writer = response.getWriter();
writer.print(responseContent);
}
});
server.start();
ClientConnectionFactory.Info http1Info = HttpClientConnectionFactory.HTTP11;
ClientConnectionFactoryOverHTTP2.HTTP2 http2Info = new ClientConnectionFactoryOverHTTP2.HTTP2(new HTTP2Client());
HttpClientTransportDynamic transport = new HttpClientTransportDynamic(new ClientConnector(new QuicClientConnectorConfigurator()), http1Info, http2Info);
client = new HttpClient(transport);
client.start();
}
@AfterEach
public void tearDown()
{
LifeCycle.stop(client);
LifeCycle.stop(server);
}
@Test
public void testSimpleHTTP1() throws Exception
{
ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort());
assertThat(response.getStatus(), is(200));
String contentAsString = response.getContentAsString();
assertThat(contentAsString, is(responseContent));
}
@Test
public void testSimpleHTTP2() throws Exception
{
ContentResponse response = client.newRequest("https://localhost:" + connector.getLocalPort())
.version(HttpVersion.HTTP_2)
.send();
assertThat(response.getStatus(), is(200));
String contentAsString = response.getContentAsString();
assertThat(contentAsString, is(responseContent));
}
@Test
public void testManyHTTP1() throws Exception
{
for (int i = 0; i < 1000; i++)
{
ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort());
assertThat(response.getStatus(), is(200));
String contentAsString = response.getContentAsString();
assertThat(contentAsString, is(responseContent));
}
}
@Test
public void testMultiThreadedHTTP1()
{
int count = 1000;
CompletableFuture<?>[] futures = new CompletableFuture[count];
for (int i = 0; i < count; ++i)
{
futures[i] = CompletableFuture.runAsync(() ->
{
try
{
ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort());
assertThat(response.getStatus(), is(200));
String contentAsString = response.getContentAsString();
assertThat(contentAsString, is(responseContent));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
CompletableFuture.allOf(futures).join();
}
}

View File

@ -0,0 +1,3 @@
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.quic.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=INFO

View File

@ -1,14 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-parent</artifactId>
<version>10.0.2-SNAPSHOT</version>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-parent</artifactId>
<version>10.0.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http3-common</artifactId>
<name>Jetty :: HTTP3 :: Common</name>
<artifactId>quic-common</artifactId>
<name>Jetty :: QUIC :: Common</name>
<properties>
<bundle-symbolic-name>${project.groupId}.common</bundle-symbolic-name>
@ -20,8 +20,8 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>cloudflare-quiche-jna</artifactId>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-quiche</artifactId>
<version>${project.version}</version>
</dependency>

View File

@ -11,11 +11,11 @@
// ========================================================================
//
module org.eclipse.jetty.http3.common
module org.eclipse.jetty.quic.common
{
exports org.eclipse.jetty.http3.common;
exports org.eclipse.jetty.quic.common;
requires transitive org.eclipse.jetty.http3.quiche;
requires transitive org.eclipse.jetty.quic.quiche;
requires transitive org.eclipse.jetty.io;
requires org.slf4j;
}

View File

@ -11,21 +11,23 @@
// ========================================================================
//
package org.eclipse.jetty.http3.common;
package org.eclipse.jetty.quic.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
@ -34,6 +36,13 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link Connection} implementation that receives and sends datagram packets via its associated datagram {@link EndPoint}.</p>
* <p>The received bytes are peeked to obtain the QUIC connection ID; each QUIC connection ID has an associated
* {@link QuicSession}, and the received bytes are then passed to the {@link QuicSession} for processing.</p>
* <p>On the receive side, a QuicConnection <em>fans-out</em> to multiple {@link QuicSession}s.</p>
* <p>On the send side, many {@link QuicSession}s <em>fan-in</em> to a QuicConnection.</p>
*/
public abstract class QuicConnection extends AbstractConnection
{
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
@ -43,9 +52,9 @@ public abstract class QuicConnection extends AbstractConnection
private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher();
protected QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp)
protected QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint)
{
super(endp, executor);
super(endPoint, executor);
this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool;
}
@ -62,7 +71,8 @@ public abstract class QuicConnection extends AbstractConnection
protected void closeSession(QuicheConnectionId quicheConnectionId, QuicSession session, Throwable x)
{
LOG.debug("closing session of type {} cid={}", getClass().getSimpleName(), quicheConnectionId);
if (LOG.isDebugEnabled())
LOG.debug("closing session of type {} cid={}", getClass().getSimpleName(), quicheConnectionId);
if (quicheConnectionId != null)
sessions.remove(quicheConnectionId);
}
@ -70,11 +80,13 @@ public abstract class QuicConnection extends AbstractConnection
@Override
public void close()
{
LOG.debug("closing connection of type {}", getClass().getSimpleName());
if (LOG.isDebugEnabled())
LOG.debug("closing connection of type {}", getClass().getSimpleName());
sessions.values().forEach(QuicSession::close);
sessions.clear();
super.close();
LOG.debug("closed connection of type {}", getClass().getSimpleName());
if (LOG.isDebugEnabled())
LOG.debug("closed connection of type {}", getClass().getSimpleName());
}
@Override
@ -87,7 +99,8 @@ public abstract class QuicConnection extends AbstractConnection
while (true)
{
BufferUtil.clear(cipherBuffer);
int fill = getEndPoint().fill(cipherBuffer);
SocketAddress remoteAddress = getEndPoint().receive(cipherBuffer);
int fill = remoteAddress == DatagramChannelEndPoint.EOF ? -1 : cipherBuffer.remaining();
if (LOG.isDebugEnabled())
LOG.debug("filled cipher buffer with {} byte(s)", fill);
// ServerDatagramEndPoint will only return -1 if input is shut down.
@ -104,7 +117,6 @@ public abstract class QuicConnection extends AbstractConnection
return;
}
InetSocketAddress remoteAddress = QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
if (LOG.isDebugEnabled())
LOG.debug("peer IP address: {}, ciphertext packet size: {}", remoteAddress, cipherBuffer.remaining());
@ -121,9 +133,21 @@ public abstract class QuicConnection extends AbstractConnection
QuicSession session = sessions.get(quicheConnectionId);
if (session == null)
{
if (LOG.isDebugEnabled())
LOG.debug("packet is for unknown session, trying to create a new one");
session = createSession(remoteAddress, cipherBuffer);
if (session != null && promoteSession(quicheConnectionId, session))
if (session != null)
{
if (LOG.isDebugEnabled())
LOG.debug("session created");
session.setConnectionId(quicheConnectionId);
sessions.put(quicheConnectionId, session);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("session not created");
}
continue;
}
@ -139,11 +163,9 @@ public abstract class QuicConnection extends AbstractConnection
}
}
protected abstract QuicSession createSession(InetSocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException;
protected abstract QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException;
protected abstract boolean promoteSession(QuicheConnectionId quicheConnectionId, QuicSession session);
public void write(Callback callback, InetSocketAddress remoteAddress, ByteBuffer... buffers)
public void write(Callback callback, SocketAddress remoteAddress, ByteBuffer... buffers)
{
flusher.offer(callback, remoteAddress, buffers);
}
@ -154,7 +176,7 @@ public abstract class QuicConnection extends AbstractConnection
private final ArrayDeque<Entry> queue = new ArrayDeque<>();
private Entry entry;
public void offer(Callback callback, InetSocketAddress address, ByteBuffer[] buffers)
public void offer(Callback callback, SocketAddress address, ByteBuffer[] buffers)
{
try (AutoLock l = lock.lock())
{
@ -173,8 +195,7 @@ public abstract class QuicConnection extends AbstractConnection
if (entry == null)
return Action.IDLE;
QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address);
getEndPoint().write(this, entry.buffers);
getEndPoint().write(this, entry.address, entry.buffers);
return Action.SCHEDULED;
}
@ -192,6 +213,12 @@ public abstract class QuicConnection extends AbstractConnection
super.failed(x);
}
@Override
public InvocationType getInvocationType()
{
return entry.callback.getInvocationType();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
@ -201,10 +228,10 @@ public abstract class QuicConnection extends AbstractConnection
private class Entry
{
private final Callback callback;
private final InetSocketAddress address;
private final SocketAddress address;
private final ByteBuffer[] buffers;
private Entry(Callback callback, InetSocketAddress address, ByteBuffer[] buffers)
private Entry(Callback callback, SocketAddress address, ByteBuffer[] buffers)
{
this.callback = callback;
this.address = address;

View File

@ -11,10 +11,10 @@
// ========================================================================
//
package org.eclipse.jetty.http3.common;
package org.eclipse.jetty.quic.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
@ -24,22 +24,32 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Represents a logical connection with a remote peer, identified by a QUIC connection ID.</p>
* <p>Each QuicSession maintains a number of QUIC streams, identified by their QUIC stream ID;
* Each QUIC stream is wrapped in an {@link EndPoint}, namely {@link QuicStreamEndPoint}.</p>
* <p>Bytes received from a {@link QuicConnection} in {@link #process(SocketAddress, ByteBuffer)}
* are passed to Quiche for processing; in turn, Quiche produces a list of QUIC stream IDs that
* have pending I/O events, either read-ready or write-ready.</p>
* <p>On the receive side, a QuicSession <em>fans-out</em> to multiple {@link QuicStreamEndPoint}s.</p>
* <p>On the send side, many {@link QuicStreamEndPoint}s <em>fan-in</em> to a QuicSession.</p>
*/
public abstract class QuicSession
{
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
@ -53,10 +63,10 @@ public abstract class QuicSession
private final ExecutionStrategy strategy;
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private InetSocketAddress remoteAddress;
private SocketAddress remoteAddress;
private QuicheConnectionId quicheConnectionId;
protected QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
protected QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress)
{
this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool;
@ -64,13 +74,7 @@ public abstract class QuicSession
this.connection = connection;
this.remoteAddress = remoteAddress;
this.flusher = new Flusher(scheduler);
this.strategy = new EatWhatYouKill(() ->
{
try (AutoLock l = strategyQueueLock.lock())
{
return strategyQueue.poll();
}
}, executor);
this.strategy = new AdaptiveExecutionStrategy(new Producer(), executor);
LifeCycle.start(strategy);
}
@ -84,9 +88,9 @@ public abstract class QuicSession
return quicheConnection.getNegotiatedProtocol();
}
public void createStream(long streamId)
public void onOpen()
{
getOrCreateStreamEndPoint(streamId);
getOrCreateStreamEndPoint(0);
}
public int fill(long streamId, ByteBuffer buffer) throws IOException
@ -127,12 +131,12 @@ public abstract class QuicSession
endpoints.remove(streamId);
}
InetSocketAddress getLocalAddress()
public SocketAddress getLocalAddress()
{
return connection.getEndPoint().getLocalAddress();
return connection.getEndPoint().getLocalSocketAddress();
}
public InetSocketAddress getRemoteAddress()
public SocketAddress getRemoteAddress()
{
return remoteAddress;
}
@ -147,7 +151,7 @@ public abstract class QuicSession
this.quicheConnectionId = quicheConnectionId;
}
public void process(InetSocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
public void process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{
this.remoteAddress = remoteAddress;
quicheConnection.feedCipherText(cipherBufferIn);
@ -159,14 +163,7 @@ public abstract class QuicSession
LOG.debug("writable stream ids: {}", writableStreamIds);
if (!writableStreamIds.isEmpty())
{
Runnable onWritable = () ->
{
for (Long writableStreamId : writableStreamIds)
{
onWritable(writableStreamId);
}
};
dispatch(onWritable);
dispatch(new WritableStreamsTask(writableStreamIds));
}
List<Long> readableStreamIds = quicheConnection.readableStreamIds();
@ -174,8 +171,7 @@ public abstract class QuicSession
LOG.debug("readable stream ids: {}", readableStreamIds);
for (Long readableStreamId : readableStreamIds)
{
Runnable onReadable = () -> onReadable(readableStreamId);
dispatch(onReadable);
dispatch(new ReadableStreamTask(readableStreamId));
}
}
else
@ -287,7 +283,7 @@ public abstract class QuicSession
quicheConnection.onTimeout();
if (LOG.isDebugEnabled())
LOG.debug("re-iterating quiche after timeout cid={}", quicheConnectionId);
// do not use the timer thread to iterate
// Do not use the timer thread to iterate.
dispatch(() -> iterate());
}
};
@ -340,6 +336,12 @@ public abstract class QuicSession
super.succeeded();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
protected void onCompleteSuccess()
{
@ -357,4 +359,51 @@ public abstract class QuicSession
QuicSession.this.close(cause);
}
}
private class Producer implements ExecutionStrategy.Producer
{
@Override
public Runnable produce()
{
try (AutoLock l = strategyQueueLock.lock())
{
return strategyQueue.poll();
}
}
}
private class WritableStreamsTask implements Runnable
{
private final List<Long> streamIds;
private WritableStreamsTask(List<Long> streamIds)
{
this.streamIds = streamIds;
}
@Override
public void run()
{
for (long streamId : streamIds)
{
onWritable(streamId);
}
}
}
private class ReadableStreamTask implements Runnable
{
private final long streamId;
private ReadableStreamTask(long streamId)
{
this.streamId = streamId;
}
@Override
public void run()
{
onReadable(streamId);
}
}
}

View File

@ -11,18 +11,25 @@
// ========================================================================
//
package org.eclipse.jetty.http3.common;
package org.eclipse.jetty.quic.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>An {@link EndPoint} implementation on top of a QUIC stream.</p>
* <p>The correspondent {@link Connection} associated to this QuicStreamEndPoint
* parses and generates the protocol specific bytes transported by QUIC.</p>
*/
public class QuicStreamEndPoint extends AbstractEndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class);
@ -38,13 +45,13 @@ public class QuicStreamEndPoint extends AbstractEndPoint
}
@Override
public InetSocketAddress getLocalAddress()
public SocketAddress getLocalSocketAddress()
{
return session.getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
public SocketAddress getRemoteSocketAddress()
{
return session.getRemoteAddress();
}
@ -93,7 +100,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint
catch (IOException e)
{
if (LOG.isDebugEnabled())
LOG.debug("Error sending FIN on stream {}", streamId, e);
LOG.debug("error closing stream {}", streamId, e);
}
super.onClose(failure);
session.onClose(streamId);
@ -142,11 +149,15 @@ public class QuicStreamEndPoint extends AbstractEndPoint
public void onWritable()
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is writable", streamId);
getWriteFlusher().completeWrite();
}
public void onReadable()
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable", streamId);
getFillInterest().fillable();
}

View File

@ -0,0 +1,44 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
/**
* <p>This module contains the main abstractions for the QUIC protocol.</p>
* <p>A {@link org.eclipse.jetty.quic.common.QuicConnection} is a {@link org.eclipse.jetty.io.Connection}
* that receives and sends bytes from its underlying datagram {@link org.eclipse.jetty.io.EndPoint}.</p>
* <p>A {@link org.eclipse.jetty.quic.common.QuicConnection} manages many {@link org.eclipse.jetty.quic.common.QuicSession}s,
* one for each QUIC connection ID.</p>
* <p>A {@link org.eclipse.jetty.quic.common.QuicSession} manages many QUIC streams, identified by a
* stream ID and represented by an {@link org.eclipse.jetty.io.EndPoint} subclass, namely
* {@link org.eclipse.jetty.quic.common.QuicStreamEndPoint}.</p>
* <p>The {@link org.eclipse.jetty.io.Connection} associated with each {@link org.eclipse.jetty.quic.common.QuicStreamEndPoint}
* parses the bytes received on that QUIC stream, and generates the bytes to send on that QUIC stream.</p>
* <p>For example, on the server side, the layout of the components in case of HTTP/1.1 could be the following:</p>
* <pre>
* CLIENT | SERVER
*
* clientA ServerQuicSessionA
* \ /
* DatagramChannelEndPoint -- ServerQuicConnection
* / \
* clientB ServerQuicSessionB -- QuicStreamEndPointB1 -- HttpConnection
* </pre>
* <p>The {@code DatagramChannelEndPoint} receives UDP datagrams from clients.</p>
* <p>{@code ServerQuicConnection} processes the incoming datagram bytes creating a {@code ServerQuicSession} for every
* QUIC connection ID sent by the clients.</p>
* <p>{@code clientB} has created a single QUIC stream to send a single HTTP/1.1 request, which results in
* {@code ServerQuicSessionB} to create a single {@code QuicStreamEndPointB1} with its associated {@code HttpConnection}.</p>
* <p>Note that the path {@code DatagramChannelEndPoint - ServerQuicConnection - ServerQuicSessionB - QuicStreamEndPointB1}
* behaves exactly like a TCP {@link org.eclipse.jetty.io.SocketChannelEndPoint} for the associated
* {@code HttpConnection}.</p>
*/
package org.eclipse.jetty.quic.common;

View File

@ -1,17 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-parent</artifactId>
<version>10.0.2-SNAPSHOT</version>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-parent</artifactId>
<version>10.0.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudflare-quiche-jna</artifactId>
<name>Jetty :: HTTP3 :: Cloudflare Quiche JNA</name>
<artifactId>quic-quiche</artifactId>
<name>Jetty :: QUIC :: Quiche</name>
<properties>
<bundle-symbolic-name>${project.groupId}.cloudflare-quiche-jna</bundle-symbolic-name>
<bundle-symbolic-name>${project.groupId}.quic-quiche</bundle-symbolic-name>
</properties>
<build>

View File

@ -12,10 +12,10 @@
//
// The module must be open to allow JNA to find the native lib.
open module org.eclipse.jetty.http3.quiche
open module org.eclipse.jetty.quic.quiche
{
exports org.eclipse.jetty.http3.quiche.ffi;
exports org.eclipse.jetty.http3.quiche;
exports org.eclipse.jetty.quic.quiche.ffi;
exports org.eclipse.jetty.quic.quiche;
requires org.slf4j;
requires com.sun.jna;

View File

@ -11,9 +11,9 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche;
package org.eclipse.jetty.quic.quiche;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
public class QuicheConfig
{

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche;
package org.eclipse.jetty.quic.quiche;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -21,25 +21,20 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.http3.quiche.ffi.bool_pointer;
import org.eclipse.jetty.http3.quiche.ffi.char_pointer;
import org.eclipse.jetty.http3.quiche.ffi.size_t;
import org.eclipse.jetty.http3.quiche.ffi.size_t_pointer;
import org.eclipse.jetty.http3.quiche.ffi.ssize_t;
import org.eclipse.jetty.http3.quiche.ffi.uint32_t;
import org.eclipse.jetty.http3.quiche.ffi.uint32_t_pointer;
import org.eclipse.jetty.http3.quiche.ffi.uint64_t;
import org.eclipse.jetty.http3.quiche.ffi.uint64_t_pointer;
import org.eclipse.jetty.http3.quiche.ffi.uint8_t_pointer;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.quic.quiche.ffi.bool_pointer;
import org.eclipse.jetty.quic.quiche.ffi.char_pointer;
import org.eclipse.jetty.quic.quiche.ffi.size_t;
import org.eclipse.jetty.quic.quiche.ffi.size_t_pointer;
import org.eclipse.jetty.quic.quiche.ffi.ssize_t;
import org.eclipse.jetty.quic.quiche.ffi.uint32_t;
import org.eclipse.jetty.quic.quiche.ffi.uint32_t_pointer;
import org.eclipse.jetty.quic.quiche.ffi.uint64_t;
import org.eclipse.jetty.quic.quiche.ffi.uint64_t_pointer;
import org.eclipse.jetty.quic.quiche.ffi.uint8_t_pointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http3.quiche.ffi.LibQuiche.INSTANCE;
import static org.eclipse.jetty.http3.quiche.ffi.LibQuiche.QUICHE_MAX_CONN_ID_LEN;
import static org.eclipse.jetty.http3.quiche.ffi.LibQuiche.quiche_error.QUICHE_ERR_DONE;
import static org.eclipse.jetty.http3.quiche.ffi.LibQuiche.quiche_error.errToString;
public class QuicheConnection
{
private static final Logger LOG = LoggerFactory.getLogger(QuicheConnection.class);
@ -71,27 +66,27 @@ public class QuicheConnection
byte[] scid = new byte[connectionIdLength];
SECURE_RANDOM.nextBytes(scid);
LibQuiche.quiche_config libQuicheConfig = buildConfig(quicheConfig);
LibQuiche.quiche_conn quicheConn = INSTANCE.quiche_connect(peer.getHostName(), scid, new size_t(scid.length), libQuicheConfig);
LibQuiche.quiche_conn quicheConn = LibQuiche.INSTANCE.quiche_connect(peer.getHostName(), scid, new size_t(scid.length), libQuicheConfig);
return new QuicheConnection(quicheConn, libQuicheConfig);
}
private static LibQuiche.quiche_config buildConfig(QuicheConfig config) throws IOException
{
LibQuiche.quiche_config quicheConfig = INSTANCE.quiche_config_new(new uint32_t(config.getVersion()));
LibQuiche.quiche_config quicheConfig = LibQuiche.INSTANCE.quiche_config_new(new uint32_t(config.getVersion()));
if (quicheConfig == null)
throw new IOException("Failed to create quiche config");
Boolean verifyPeer = config.getVerifyPeer();
if (verifyPeer != null)
INSTANCE.quiche_config_verify_peer(quicheConfig, verifyPeer);
LibQuiche.INSTANCE.quiche_config_verify_peer(quicheConfig, verifyPeer);
String certChainPemPath = config.getCertChainPemPath();
if (certChainPemPath != null)
INSTANCE.quiche_config_load_cert_chain_from_pem_file(quicheConfig, certChainPemPath);
LibQuiche.INSTANCE.quiche_config_load_cert_chain_from_pem_file(quicheConfig, certChainPemPath);
String privKeyPemPath = config.getPrivKeyPemPath();
if (privKeyPemPath != null)
INSTANCE.quiche_config_load_priv_key_from_pem_file(quicheConfig, privKeyPemPath);
LibQuiche.INSTANCE.quiche_config_load_priv_key_from_pem_file(quicheConfig, privKeyPemPath);
String[] applicationProtos = config.getApplicationProtos();
if (applicationProtos != null)
@ -100,44 +95,44 @@ public class QuicheConnection
for (String proto : applicationProtos)
sb.append((char)proto.length()).append(proto);
String theProtos = sb.toString();
INSTANCE.quiche_config_set_application_protos(quicheConfig, theProtos, new size_t(theProtos.length()));
LibQuiche.INSTANCE.quiche_config_set_application_protos(quicheConfig, theProtos, new size_t(theProtos.length()));
}
QuicheConfig.CongestionControl cc = config.getCongestionControl();
if (cc != null)
INSTANCE.quiche_config_set_cc_algorithm(quicheConfig, cc.getValue());
LibQuiche.INSTANCE.quiche_config_set_cc_algorithm(quicheConfig, cc.getValue());
Long maxIdleTimeout = config.getMaxIdleTimeout();
if (maxIdleTimeout != null)
INSTANCE.quiche_config_set_max_idle_timeout(quicheConfig, new uint64_t(maxIdleTimeout));
LibQuiche.INSTANCE.quiche_config_set_max_idle_timeout(quicheConfig, new uint64_t(maxIdleTimeout));
Long initialMaxData = config.getInitialMaxData();
if (initialMaxData != null)
INSTANCE.quiche_config_set_initial_max_data(quicheConfig, new uint64_t(initialMaxData));
LibQuiche.INSTANCE.quiche_config_set_initial_max_data(quicheConfig, new uint64_t(initialMaxData));
Long initialMaxStreamDataBidiLocal = config.getInitialMaxStreamDataBidiLocal();
if (initialMaxStreamDataBidiLocal != null)
INSTANCE.quiche_config_set_initial_max_stream_data_bidi_local(quicheConfig, new uint64_t(initialMaxStreamDataBidiLocal));
LibQuiche.INSTANCE.quiche_config_set_initial_max_stream_data_bidi_local(quicheConfig, new uint64_t(initialMaxStreamDataBidiLocal));
Long initialMaxStreamDataBidiRemote = config.getInitialMaxStreamDataBidiRemote();
if (initialMaxStreamDataBidiRemote != null)
INSTANCE.quiche_config_set_initial_max_stream_data_bidi_remote(quicheConfig, new uint64_t(initialMaxStreamDataBidiRemote));
LibQuiche.INSTANCE.quiche_config_set_initial_max_stream_data_bidi_remote(quicheConfig, new uint64_t(initialMaxStreamDataBidiRemote));
Long initialMaxStreamDataUni = config.getInitialMaxStreamDataUni();
if (initialMaxStreamDataUni != null)
INSTANCE.quiche_config_set_initial_max_stream_data_uni(quicheConfig, new uint64_t(initialMaxStreamDataUni));
LibQuiche.INSTANCE.quiche_config_set_initial_max_stream_data_uni(quicheConfig, new uint64_t(initialMaxStreamDataUni));
Long initialMaxStreamsBidi = config.getInitialMaxStreamsBidi();
if (initialMaxStreamsBidi != null)
INSTANCE.quiche_config_set_initial_max_streams_bidi(quicheConfig, new uint64_t(initialMaxStreamsBidi));
LibQuiche.INSTANCE.quiche_config_set_initial_max_streams_bidi(quicheConfig, new uint64_t(initialMaxStreamsBidi));
Long initialMaxStreamsUni = config.getInitialMaxStreamsUni();
if (initialMaxStreamsUni != null)
INSTANCE.quiche_config_set_initial_max_streams_uni(quicheConfig, new uint64_t(initialMaxStreamsUni));
LibQuiche.INSTANCE.quiche_config_set_initial_max_streams_uni(quicheConfig, new uint64_t(initialMaxStreamsUni));
Boolean disableActiveMigration = config.getDisableActiveMigration();
if (disableActiveMigration != null)
INSTANCE.quiche_config_set_disable_active_migration(quicheConfig, disableActiveMigration);
LibQuiche.INSTANCE.quiche_config_set_disable_active_migration(quicheConfig, disableActiveMigration);
return quicheConfig;
}
@ -153,16 +148,16 @@ public class QuicheConnection
uint8_t_pointer type = new uint8_t_pointer();
uint32_t_pointer version = new uint32_t_pointer();
byte[] scid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] scid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer scid_len = new size_t_pointer(scid.length);
byte[] dcid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] dcid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer dcid_len = new size_t_pointer(dcid.length);
byte[] token = new byte[32];
size_t_pointer token_len = new size_t_pointer(token.length);
int rc = INSTANCE.quiche_header_info(packet, new size_t(packet.remaining()), new size_t(QUICHE_MAX_CONN_ID_LEN),
int rc = LibQuiche.INSTANCE.quiche_header_info(packet, new size_t(packet.remaining()), new size_t(LibQuiche.QUICHE_MAX_CONN_ID_LEN),
version, type,
scid, scid_len,
dcid, dcid_len,
@ -184,18 +179,18 @@ public class QuicheConnection
uint32_t_pointer version = new uint32_t_pointer();
// Source Connection ID
byte[] scid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] scid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer scid_len = new size_t_pointer(scid.length);
// Destination Connection ID
byte[] dcid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] dcid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer dcid_len = new size_t_pointer(dcid.length);
byte[] token = new byte[32];
size_t_pointer token_len = new size_t_pointer(token.length);
LOG.debug(" getting header info...");
int rc = INSTANCE.quiche_header_info(packetRead, new size_t(packetRead.remaining()), new size_t(QUICHE_MAX_CONN_ID_LEN),
int rc = LibQuiche.INSTANCE.quiche_header_info(packetRead, new size_t(packetRead.remaining()), new size_t(LibQuiche.QUICHE_MAX_CONN_ID_LEN),
version, type,
scid, scid_len,
dcid, dcid_len,
@ -210,11 +205,11 @@ public class QuicheConnection
LOG.debug("dcid len: {}", dcid_len);
LOG.debug("token len: {}", token_len);
if (!INSTANCE.quiche_version_is_supported(version.getPointee()))
if (!LibQuiche.INSTANCE.quiche_version_is_supported(version.getPointee()))
{
LOG.debug(" < version negotiation");
ssize_t generated = INSTANCE.quiche_negotiate_version(scid, scid_len.getPointee(), dcid, dcid_len.getPointee(), packetToSend, new size_t(packetToSend.remaining()));
ssize_t generated = LibQuiche.INSTANCE.quiche_negotiate_version(scid, scid_len.getPointee(), dcid, dcid_len.getPointee(), packetToSend, new size_t(packetToSend.remaining()));
packetToSend.position(packetToSend.position() + generated.intValue());
if (generated.intValue() < 0)
throw new IOException("failed to create vneg packet : " + generated);
@ -227,10 +222,10 @@ public class QuicheConnection
token = tokenMinter.mint(dcid, (int)dcid_len.getValue());
byte[] newCid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] newCid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
SECURE_RANDOM.nextBytes(newCid);
ssize_t generated = INSTANCE.quiche_retry(scid, scid_len.getPointee(),
ssize_t generated = LibQuiche.INSTANCE.quiche_retry(scid, scid_len.getPointee(),
dcid, dcid_len.getPointee(),
newCid, new size_t(newCid.length),
token, new size_t(token.length),
@ -256,18 +251,18 @@ public class QuicheConnection
uint32_t_pointer version = new uint32_t_pointer();
// Source Connection ID
byte[] scid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] scid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer scid_len = new size_t_pointer(scid.length);
// Destination Connection ID
byte[] dcid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] dcid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer dcid_len = new size_t_pointer(dcid.length);
byte[] token = new byte[32];
size_t_pointer token_len = new size_t_pointer(token.length);
LOG.debug(" getting header info...");
int rc = INSTANCE.quiche_header_info(packetRead, new size_t(packetRead.remaining()), new size_t(QUICHE_MAX_CONN_ID_LEN),
int rc = LibQuiche.INSTANCE.quiche_header_info(packetRead, new size_t(packetRead.remaining()), new size_t(LibQuiche.QUICHE_MAX_CONN_ID_LEN),
version, type,
scid, scid_len,
dcid, dcid_len,
@ -281,7 +276,7 @@ public class QuicheConnection
LOG.debug("dcid len: {}", dcid_len);
LOG.debug("token len: {}", token_len);
if (!INSTANCE.quiche_version_is_supported(version.getPointee()))
if (!LibQuiche.INSTANCE.quiche_version_is_supported(version.getPointee()))
{
LOG.debug(" < need version negotiation");
return null;
@ -302,10 +297,10 @@ public class QuicheConnection
LOG.debug(" connection creation...");
LibQuiche.quiche_config libQuicheConfig = buildConfig(quicheConfig);
LibQuiche.quiche_conn quicheConn = INSTANCE.quiche_accept(dcid, dcid_len.getPointee(), odcid, new size_t(odcid.length), libQuicheConfig);
LibQuiche.quiche_conn quicheConn = LibQuiche.INSTANCE.quiche_accept(dcid, dcid_len.getPointee(), odcid, new size_t(odcid.length), libQuicheConfig);
if (quicheConn == null)
{
INSTANCE.quiche_config_free(libQuicheConfig);
LibQuiche.INSTANCE.quiche_config_free(libQuicheConfig);
throw new IOException("failed to create connection");
}
@ -360,7 +355,7 @@ public class QuicheConnection
int received = libQuiche().quiche_conn_recv(quicheConn, buffer, new size_t(buffer.remaining())).intValue();
if (received < 0)
throw new IOException("Quiche failed to receive packet; err=" + errToString(received));
throw new IOException("Quiche failed to receive packet; err=" + LibQuiche.quiche_error.errToString(received));
buffer.position(buffer.position() + received);
return received;
}
@ -376,7 +371,7 @@ public class QuicheConnection
if (quicheConn == null)
throw new IOException("Cannot send when not connected");
int written = libQuiche().quiche_conn_send(quicheConn, buffer, new size_t(buffer.remaining())).intValue();
if (written == QUICHE_ERR_DONE)
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)
throw new IOException("Quiche failed to send packet; err=" + LibQuiche.quiche_error.errToString(written));
@ -436,7 +431,7 @@ public class QuicheConnection
int rc = libQuiche().quiche_conn_close(quicheConn, true, new uint64_t(0), null, new size_t(0));
if (rc == 0)
return true;
if (rc == QUICHE_ERR_DONE)
if (rc == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return false;
throw new IOException("failed to close connection: " + LibQuiche.quiche_error.errToString(rc));
}
@ -460,7 +455,7 @@ public class QuicheConnection
{
long value = libQuiche().quiche_conn_stream_capacity(quicheConn, new uint64_t(streamId)).longValue();
if (value < 0)
throw new IOException("Quiche failed to read capacity of stream " + streamId + "; err=" + errToString(value));
throw new IOException("Quiche failed to read capacity of stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(value));
return value;
}
@ -468,7 +463,7 @@ public class QuicheConnection
{
int direction = writeSide ? LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_WRITE : LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_READ;
int rc = libQuiche().quiche_conn_stream_shutdown(quicheConn, new uint64_t(streamId), direction, new uint64_t(0));
if (rc == 0 || rc == QUICHE_ERR_DONE)
if (rc == 0 || rc == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return;
throw new IOException("failed to shutdown stream " + streamId + ": " + LibQuiche.quiche_error.errToString(rc));
}
@ -476,7 +471,7 @@ public class QuicheConnection
public synchronized void feedFinForStream(long streamId) throws IOException
{
int written = libQuiche().quiche_conn_stream_send(quicheConn, new uint64_t(streamId), null, new size_t(0), true).intValue();
if (written == QUICHE_ERR_DONE)
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return;
if (written < 0L)
throw new IOException("Quiche failed to write FIN to stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
@ -485,7 +480,7 @@ public class QuicheConnection
public synchronized int feedClearTextForStream(long streamId, ByteBuffer buffer) throws IOException
{
int written = libQuiche().quiche_conn_stream_send(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), false).intValue();
if (written == QUICHE_ERR_DONE)
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)
throw new IOException("Quiche failed to write to stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
@ -497,7 +492,7 @@ public class QuicheConnection
{
bool_pointer fin = new bool_pointer();
int written = libQuiche().quiche_conn_stream_recv(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), fin).intValue();
if (written == QUICHE_ERR_DONE)
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)
throw new IOException("Quiche failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
@ -514,7 +509,7 @@ public class QuicheConnection
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
return INSTANCE;
return LibQuiche.INSTANCE;
}
public interface TokenMinter

View File

@ -11,21 +11,18 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche;
package org.eclipse.jetty.quic.quiche;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.http3.quiche.ffi.size_t;
import org.eclipse.jetty.http3.quiche.ffi.size_t_pointer;
import org.eclipse.jetty.http3.quiche.ffi.uint32_t_pointer;
import org.eclipse.jetty.http3.quiche.ffi.uint8_t_pointer;
import static org.eclipse.jetty.http3.quiche.ffi.LibQuiche.INSTANCE;
import static org.eclipse.jetty.http3.quiche.ffi.LibQuiche.QUICHE_MAX_CONN_ID_LEN;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.quic.quiche.ffi.size_t;
import org.eclipse.jetty.quic.quiche.ffi.size_t_pointer;
import org.eclipse.jetty.quic.quiche.ffi.uint32_t_pointer;
import org.eclipse.jetty.quic.quiche.ffi.uint8_t_pointer;
public class QuicheConnectionId
{
@ -78,17 +75,17 @@ public class QuicheConnectionId
uint32_t_pointer version = new uint32_t_pointer();
// Source Connection ID
byte[] scid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] scid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer scid_len = new size_t_pointer(scid.length);
// Destination Connection ID
byte[] dcid = new byte[QUICHE_MAX_CONN_ID_LEN];
byte[] dcid = new byte[LibQuiche.QUICHE_MAX_CONN_ID_LEN];
size_t_pointer dcid_len = new size_t_pointer(dcid.length);
byte[] token = new byte[32];
size_t_pointer token_len = new size_t_pointer(token.length);
int rc = INSTANCE.quiche_header_info(packet, new size_t(packet.remaining()), new size_t(QUICHE_MAX_CONN_ID_LEN),
int rc = LibQuiche.INSTANCE.quiche_header_info(packet, new size_t(packet.remaining()), new size_t(LibQuiche.QUICHE_MAX_CONN_ID_LEN),
version, type,
scid, scid_len,
dcid, dcid_len,

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@ -21,7 +21,6 @@ import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.Structure;
import com.sun.jna.ptr.PointerByReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.ptr.ByReference;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import java.nio.charset.Charset;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.IntegerType;
import com.sun.jna.Native;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.Native;
import com.sun.jna.ptr.ByReference;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.IntegerType;
import com.sun.jna.Native;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.IntegerType;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.ptr.ByReference;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.IntegerType;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.ptr.ByReference;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.IntegerType;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.quiche.ffi;
package org.eclipse.jetty.quic.quiche.ffi;
import com.sun.jna.ptr.ByReference;

View File

@ -1,14 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-parent</artifactId>
<version>10.0.2-SNAPSHOT</version>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-parent</artifactId>
<version>10.0.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http3-server</artifactId>
<name>Jetty :: HTTP3 :: Server</name>
<artifactId>quic-server</artifactId>
<name>Jetty :: QUIC :: Server</name>
<properties>
<bundle-symbolic-name>${project.groupId}.server</bundle-symbolic-name>
@ -25,8 +25,8 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-common</artifactId>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -11,12 +11,12 @@
// ========================================================================
//
module org.eclipse.jetty.http3.server
module org.eclipse.jetty.quic.server
{
exports org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.quic.server;
requires org.eclipse.jetty.http3.common;
requires org.eclipse.jetty.http3.quiche;
requires org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.quic.quiche;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.server;
requires org.eclipse.jetty.util;

View File

@ -11,21 +11,23 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicConnection;
import org.eclipse.jetty.http3.common.QuicSession;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.quiche.QuicheConfig;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.quic.server.internal.SimpleTokenMinter;
import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -33,6 +35,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The server specific implementation of {@link QuicConnection}.</p>
*/
public class ServerQuicConnection extends QuicConnection
{
private static final Logger LOG = LoggerFactory.getLogger(ServerQuicConnection.class);
@ -55,18 +60,18 @@ public class ServerQuicConnection extends QuicConnection
}
@Override
protected QuicSession createSession(InetSocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{
ByteBufferPool byteBufferPool = getByteBufferPool();
// TODO make the token validator configurable
QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, new SimpleTokenValidator(remoteAddress), cipherBuffer);
QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, new SimpleTokenValidator((InetSocketAddress)remoteAddress), cipherBuffer);
if (quicheConnection == null)
{
// TODO make the buffer size configurable
ByteBuffer negotiationBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
int pos = BufferUtil.flipToFill(negotiationBuffer);
// TODO make the token minter configurable
if (!QuicheConnection.negotiate(new SimpleTokenMinter(remoteAddress), cipherBuffer, negotiationBuffer))
if (!QuicheConnection.negotiate(new SimpleTokenMinter((InetSocketAddress)remoteAddress), cipherBuffer, negotiationBuffer))
{
if (LOG.isDebugEnabled())
LOG.debug("QUIC connection negotiation failed, dropping packet");
@ -87,11 +92,4 @@ public class ServerQuicConnection extends QuicConnection
return session;
}
}
@Override
protected boolean promoteSession(QuicheConnectionId quicheConnectionId, QuicSession session)
{
session.setConnectionId(quicheConnectionId);
return true;
}
}

View File

@ -11,28 +11,26 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.EventListener;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.quic.quiche.QuicheConfig;
import org.eclipse.jetty.quic.server.internal.SSLKeyPair;
import org.eclipse.jetty.server.AbstractNetworkConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Server;
@ -40,6 +38,7 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.thread.Scheduler;
// TODO: add configuration and figure out interaction with SslContextFactory.
public class ServerQuicConnector extends AbstractNetworkConnector
{
private final ServerDatagramSelectorManager _manager;
@ -96,6 +95,12 @@ public class ServerQuicConnector extends AbstractNetworkConnector
this(server, null, null, null, 1, factories);
}
@Override
public int getLocalPort()
{
return _localPort;
}
@Override
protected void doStart() throws Exception
{
@ -196,108 +201,17 @@ public class ServerQuicConnector extends AbstractNetworkConnector
}
@Override
public void accept(SelectableChannel channel, Object attachment)
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
ManagedSelector selector = chooseSelector();
selector.submit(new Accept(channel, attachment));
EndPoint endPoint = new DatagramChannelEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler());
endPoint.setIdleTimeout(getIdleTimeout());
return endPoint;
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new QuicDatagramEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler());
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
return new ServerQuicConnection(getExecutor(), getScheduler(), getByteBufferPool(), endpoint, _quicheConfig, ServerQuicConnector.this);
}
@Override
public String toString()
{
return String.format("DatagramSelectorManager@%s", ServerQuicConnector.this);
}
class Accept implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Runnable, Closeable
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SelectableChannel _channel;
private final Object _attachment;
private volatile SelectionKey _key;
Accept(SelectableChannel channel, Object attachment)
{
_channel = channel;
_attachment = attachment;
}
@Override
public void update(Selector selector)
{
try
{
_key = _channel.register(selector, SelectionKey.OP_READ, this);
if (LOG.isDebugEnabled())
LOG.debug("{} reader={}", this, _channel);
}
catch (Throwable x)
{
failed(x);
}
}
@Override
public Runnable onSelected()
{
if (LOG.isDebugEnabled())
LOG.debug("Accept onSelected");
_key.interestOps(0);
return this;
}
@Override
public void run()
{
try
{
chooseSelector().createEndPoint(_channel, _key);
}
catch (Throwable x)
{
failed(x);
}
}
@Override
public void updateKey()
{
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
@Override
public void close() throws IOException
{
// May be called from any thread.
// Implements AbstractConnector.setAccepting(boolean).
chooseSelector().submit(selector -> _key.cancel());
}
private void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
{
IO.close(_channel);
_manager.connectionFailed(_channel, failure, _attachment);
}
}
}
}
}

View File

@ -11,28 +11,34 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicConnection;
import org.eclipse.jetty.http3.common.QuicSession;
import org.eclipse.jetty.http3.common.QuicStreamEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>The server specific implementation of {@link QuicSession}.</p>
* <p>When asked to create a QUIC stream, it creates a {@link QuicStreamEndPoint}
* with an associated {@link Connection} created from the {@link ConnectionFactory},
* retrieved from the server {@link Connector}, correspondent to the protocol
* negotiated with the client (or the default protocol).</p>
*/
public class ServerQuicSession extends QuicSession
{
private final Connector connector;
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Connector connector)
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
{
super(executor, scheduler, byteBufferPool, quicheConnection, connection, remoteAddress);
this.connector = connector;

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server.internal;
import java.io.File;
import java.io.FileInputStream;
@ -43,13 +43,6 @@ public class SSLKeyPair
private final Certificate[] certChain;
private final String alias;
public SSLKeyPair(Key key, Certificate[] certChain, String alias)
{
this.key = key;
this.certChain = certChain;
this.alias = alias;
}
public SSLKeyPair(File storeFile, String storeType, char[] storePassword, String alias, char[] keyPassword) throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException, IOException, CertificateException
{
KeyStore keyStore = KeyStore.getInstance(storeType);
@ -73,17 +66,17 @@ public class SSLKeyPair
try (FileOutputStream fos = new FileOutputStream(files[0]))
{
writeAsPem(fos, key);
writeAsPEM(fos, key);
}
try (FileOutputStream fos = new FileOutputStream(files[1]))
{
for (Certificate cert : certChain)
writeAsPem(fos, cert);
writeAsPEM(fos, cert);
}
return files;
}
private void writeAsPem(OutputStream outputStream, Key key) throws IOException
private void writeAsPEM(OutputStream outputStream, Key key) throws IOException
{
byte[] encoded = encoder.encode(key.getEncoded());
outputStream.write(BEGIN_KEY);
@ -94,7 +87,7 @@ public class SSLKeyPair
outputStream.write(LINE_SEPARATOR);
}
private void writeAsPem(OutputStream outputStream, Certificate certificate) throws CertificateEncodingException, IOException
private void writeAsPEM(OutputStream outputStream, Certificate certificate) throws CertificateEncodingException, IOException
{
byte[] encoded = encoder.encode(certificate.getEncoded());
outputStream.write(BEGIN_CERT);

View File

@ -11,13 +11,13 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server.internal;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
public class SimpleTokenMinter implements QuicheConnection.TokenMinter
{

View File

@ -11,14 +11,14 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server.internal;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
public class SimpleTokenValidator implements QuicheConnection.TokenValidator
{

View File

@ -11,13 +11,10 @@
// ========================================================================
//
package org.eclipse.jetty.http3.server;
package org.eclipse.jetty.quic.server;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -50,7 +47,7 @@ public class ServerQuicConnectorTest
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
PrintWriter writer = response.getWriter();
@ -87,24 +84,14 @@ public class ServerQuicConnectorTest
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
int contentLength = 16 * 1024 * 1024;
response.setContentLength(contentLength);
response.setContentType("text/plain");
ServletOutputStream outputStream = response.getOutputStream();
File file = new File("./src/test/resources/zfs-intro.pdf");
response.setContentLengthLong(file.length());
byte[] buffer = new byte[1024];
try (FileInputStream fis = new FileInputStream(file))
{
while (true)
{
int read = fis.read(buffer);
if (read == -1)
break;
outputStream.write(buffer, 0, read);
}
}
outputStream.println("0".repeat(contentLength));
}
});

View File

@ -199,7 +199,7 @@
<module>jetty-io</module>
<module>jetty-http</module>
<module>jetty-http2</module>
<module>jetty-http3</module>
<module>jetty-quic</module>
<module>jetty-server</module>
<module>jetty-xml</module>
<module>jetty-security</module>