kinda sorta done the piping work up to Session.process()

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-22 17:16:47 +01:00 committed by Simone Bordet
parent c8d7bf01e1
commit c8e66daefe
4 changed files with 244 additions and 64 deletions

View File

@ -15,7 +15,6 @@ package org.eclipse.jetty.http3.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Map;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
@ -42,14 +41,9 @@ public class HttpClientTransportOverQuic extends AbstractHttpClientTransport
public HttpClientTransportOverQuic()
{
this("http/1.1");
}
public HttpClientTransportOverQuic(String... alpnProtocols)
{
//TODO the Protocol instance should be passed around instead of the alpn string array
connector = new QuicClientConnector(alpnProtocols);
protocol = new Origin.Protocol(Arrays.asList(alpnProtocols), false);
//TODO the ClientConnectionFactory should be built according to the Protocol instance. See HttpClientTransportDynamic
protocol = new Origin.Protocol(HttpClientConnectionFactory.HTTP11.getProtocols(true), false);
connector = new QuicClientConnector(protocol);
addBean(connector);
setConnectionPoolFactory(destination ->
{

View File

@ -13,19 +13,28 @@
package org.eclipse.jetty.http3.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.DatagramChannel;
import java.nio.channels.Selector;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
@ -33,7 +42,9 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.IClientConnector;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -64,10 +75,14 @@ public class QuicClientConnector extends ContainerLifeCycle implements IClientCo
private SocketAddress bindAddress;
private boolean reuseAddress = true;
public QuicClientConnector(String... alpnProtocol)
public QuicClientConnector(Origin.Protocol... protocols)
{
String[] applicationProtos = Arrays.stream(protocols)
.flatMap(protocol -> protocol.getProtocols().stream())
.toArray(String[]::new);
quicheConfig = new QuicheConfig();
quicheConfig.setApplicationProtos(alpnProtocol);
quicheConfig.setApplicationProtos(applicationProtos);
quicheConfig.setMaxIdleTimeout(5000L);
quicheConfig.setInitialMaxData(10000000L);
quicheConfig.setInitialMaxStreamDataBidiLocal(10000000L);
@ -324,7 +339,8 @@ public class QuicClientConnector extends ContainerLifeCycle implements IClientCo
@Override
public void connect(SelectableChannel channel, Object attachment)
{
throw new UnsupportedOperationException("TODO");
ManagedSelector managedSelector = chooseSelector();
managedSelector.submit(new Connect(channel, attachment));
}
@Override
@ -338,17 +354,17 @@ public class QuicClientConnector extends ContainerLifeCycle implements IClientCo
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
return new QuicConnection(executor, scheduler, byteBufferPool, endPoint, context, quicheConfig);
DatagramReader reader = (DatagramReader)attachment;
Map<String, Object> contextMap = reader.getContext();
return new QuicConnection(executor, scheduler, byteBufferPool, endPoint, contextMap);
}
@Override
public void connectionOpened(Connection connection, Object context)
public void connectionOpened(Connection connection, Object attachment)
{
super.connectionOpened(connection, context);
@SuppressWarnings("unchecked")
Map<String, Object> contextMap = (Map<String, Object>)context;
super.connectionOpened(connection, attachment);
DatagramReader reader = (DatagramReader)attachment;
Map<String, Object> contextMap = reader.getContext();
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)contextMap.get(CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
@ -358,9 +374,170 @@ public class QuicClientConnector extends ContainerLifeCycle implements IClientCo
@Override
protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(failure, context);
DatagramReader reader = (DatagramReader)attachment;
Map<String, Object> contextMap = reader.getContext();
connectFailed(failure, contextMap);
}
class Connect implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Runnable
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SelectableChannel channel;
private final Object attachment;
Connect(SelectableChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
}
@Override
public void update(Selector selector)
{
try
{
channel.register(selector, SelectionKey.OP_WRITE, this);
}
catch (Throwable x)
{
failed(x);
}
}
@Override
public Runnable onSelected()
{
return this;
}
@Override
public void run()
{
Map<String, Object> context = (Map<String, Object>)attachment;
InetSocketAddress remoteSocketAddress = (InetSocketAddress)context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
try
{
QuicheConnection quicheConnection = QuicheConnection.connect(quicheConfig, remoteSocketAddress);
ByteBufferPool bufferPool = getByteBufferPool();
ByteBuffer cipherText = bufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
int pos = BufferUtil.flipToFill(cipherText);
quicheConnection.drainCipherText(cipherText);
//connection.nextTimeout(); // TODO quiche timeout handling is missing for pending connections
BufferUtil.flipToFlush(cipherText, pos);
int sent = ((DatagramChannel)channel).send(cipherText, remoteSocketAddress);// TODO channel send could fail
context.put(quicheConnection.getClass().getName(), quicheConnection);
DatagramReader reader = new DatagramReader(channel, context);
chooseSelector().submit(reader);
}
catch (IOException e)
{
throw new RuntimeIOException(e);
}
}
@Override
public void updateKey()
{
//throw new UnsupportedOperationException();
}
@Override
public void replaceKey(SelectionKey newKey)
{
throw new UnsupportedOperationException();
}
public void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
{
IO.close(channel);
selectorManager.connectionFailed(channel, failure, attachment);
}
}
}
class DatagramReader implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Closeable
{
private final SelectableChannel _channel;
private final Map<String, Object> context;
private volatile boolean endPointCreated;
private volatile SelectionKey _key;
DatagramReader(SelectableChannel channel, Map<String, Object> context)
{
_channel = channel;
this.context = context;
}
public Map<String, Object> getContext()
{
return context;
}
@Override
public void update(Selector selector)
{
try
{
_key = _channel.register(selector, SelectionKey.OP_READ, this);
if (LOG.isDebugEnabled())
LOG.debug("{} reader={}", this, _channel);
}
catch (Throwable x)
{
IO.close(_channel);
if (LOG.isDebugEnabled())
LOG.debug("Unable to register OP_READ on selector for {}", _channel, x);
}
}
@Override
public Runnable onSelected()
{
if (LOG.isDebugEnabled())
LOG.debug("DatagramReader onSelected");
if (!endPointCreated)
{
try
{
// TODO needs to be dispatched.
chooseSelector().createEndPoint(_channel, _key);
endPointCreated = true;
}
catch (Throwable x)
{
IO.close(_channel);
// TODO: is this enough of we need to notify someone?
if (LOG.isDebugEnabled())
LOG.debug("createEndPoint failed for channel {}", _channel, x);
}
}
return null;
}
@Override
public void updateKey()
{
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
@Override
public void close() throws IOException
{
// May be called from any thread.
// Implements AbstractConnector.setAccepting(boolean).
chooseSelector().submit(selector -> _key.cancel());
}
}
}
}

View File

@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
@ -36,21 +35,23 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http3.client.QuicClientConnector.REMOTE_SOCKET_ADDRESS_CONTEXT_KEY;
public class QuicConnection extends AbstractConnection
{
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
private final QuicheConfig quicheConfig;
private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher();
private final Scheduler scheduler;
private final Map<String, Object> context;
private final Map<InetSocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map<String, Object> context, QuicheConfig quicheConfig)
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
private final Map<String, Object> context;
private final Flusher flusher = new Flusher();
public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map<String, Object> context)
{
super(endPoint, executor);
this.quicheConfig = quicheConfig;
this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool;
this.context = context;
@ -72,6 +73,17 @@ public class QuicConnection extends AbstractConnection
public void onOpen()
{
super.onOpen();
InetSocketAddress remoteAddress = (InetSocketAddress)context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
QuicheConnection quicheConnection = (QuicheConnection)context.get(QuicheConnection.class.getName());
QuicSession session = new QuicSession(getExecutor(), scheduler, byteBufferPool, context, null, quicheConnection, this, remoteAddress);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that accept generated.
if (LOG.isDebugEnabled())
LOG.debug("created connecting QUIC session {}", session);
fillInterested();
}
@ -115,46 +127,33 @@ public class QuicConnection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("packet contains connection ID {}", quicheConnectionId);
boolean pending = false;
QuicSession session = sessions.get(quicheConnectionId);
if (session == null)
{
if (LOG.isDebugEnabled())
LOG.debug("no existing session with connection ID {}, trying to accept new QUIC connection", quicheConnectionId);
QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, remoteAddress, cipherBuffer);
if (quicheConnection == null)
{
ByteBuffer negotiationBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
int pos = BufferUtil.flipToFill(negotiationBuffer);
if (!QuicheConnection.negotiate(remoteAddress, cipherBuffer, negotiationBuffer))
{
if (LOG.isDebugEnabled())
LOG.debug("QUIC connection negotiation failed, dropping packet");
byteBufferPool.release(negotiationBuffer);
continue;
}
BufferUtil.flipToFlush(negotiationBuffer, pos);
ClientDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(remoteAddress);
getEndPoint().write(Callback.from(() -> byteBufferPool.release(negotiationBuffer)), negotiationBuffer);
if (LOG.isDebugEnabled())
LOG.debug("QUIC connection negotiation packet sent");
}
else
{
session = new QuicSession(getExecutor(), scheduler, byteBufferPool, context, quicheConnectionId, quicheConnection, this, remoteAddress);
sessions.putIfAbsent(quicheConnectionId, session);
session.flush(); // send the response packet(s) that accept generated.
if (LOG.isDebugEnabled())
LOG.debug("created QUIC session {} with connection ID {}", session, quicheConnectionId);
}
// Once here, cipherBuffer has been fully consumed.
continue;
session = pendingSessions.get(remoteAddress);
if (session == null)
throw new IllegalStateException("cannot find session with ID " + quicheConnectionId);
pending = true;
session.setConnectionId(quicheConnectionId);
}
else
{
System.out.println("got packet for established session");
}
if (LOG.isDebugEnabled())
LOG.debug("packet is for existing session with connection ID {}, processing it ({} byte(s))", quicheConnectionId, cipherBuffer.remaining());
session.process(remoteAddress, cipherBuffer);
if (pending)
{
if (session.isConnectionEstablished())
{
pendingSessions.remove(remoteAddress);
sessions.put(quicheConnectionId, session);
}
}
}
}
catch (Throwable x)

View File

@ -52,7 +52,7 @@ public class QuicSession
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
private final Map<String, Object> context;
private final QuicheConnectionId quicheConnectionId;
private volatile QuicheConnectionId quicheConnectionId;
private final QuicheConnection quicheConnection;
private final QuicConnection connection;
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
@ -129,6 +129,16 @@ public class QuicSession
return remoteAddress;
}
public boolean isConnectionEstablished()
{
return quicheConnection.isConnectionEstablished();
}
public void setConnectionId(QuicheConnectionId quicheConnectionId)
{
this.quicheConnectionId = quicheConnectionId;
}
void process(InetSocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{
this.remoteAddress = remoteAddress;