better align client and server quic sessions

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-23 12:30:37 +01:00 committed by Simone Bordet
parent a1b4eafc73
commit 7d303b14c8
4 changed files with 39 additions and 17 deletions

View File

@ -85,7 +85,7 @@ public class QuicConnection extends AbstractConnection
{
InetSocketAddress remoteAddress = (InetSocketAddress)context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
QuicheConnection quicheConnection = QuicheConnection.connect(quicheConfig, remoteAddress);
QuicSession session = new QuicSession(getExecutor(), scheduler, this.byteBufferPool, context, null, quicheConnection, this, remoteAddress);
QuicSession session = new QuicSession(getExecutor(), scheduler, this.byteBufferPool, null, quicheConnection, this, remoteAddress, context);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that accept generated.
if (LOG.isDebugEnabled())

View File

@ -48,11 +48,11 @@ public class QuicSession
{
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
private final Map<String, Object> context;
private final Flusher flusher;
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
private final Map<String, Object> context;
private volatile QuicheConnectionId quicheConnectionId;
private final QuicheConnection quicheConnection;
private final QuicConnection connection;
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
@ -60,12 +60,12 @@ public class QuicSession
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private InetSocketAddress remoteAddress;
private QuicheConnectionId quicheConnectionId;
QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, Map<String, Object> context, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Map<String, Object> context)
{
this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool;
this.context = context;
this.quicheConnectionId = quicheConnectionId;
this.quicheConnection = quicheConnection;
this.connection = connection;
@ -79,6 +79,8 @@ public class QuicSession
}
}, executor);
LifeCycle.start(strategy);
this.context = context;
}
public void createStream(long streamId)
@ -229,18 +231,19 @@ public class QuicSession
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientDatagramConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(scheduler, this, streamId);
Connection connection;
try
{
Connection connection = connectionFactory.newConnection(endPoint, context);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
return endPoint;
connection = connectionFactory.newConnection(endPoint, context);
}
catch (IOException e)
{
throw new RuntimeIOException("Error creating new connection", e);
}
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
return endPoint;
}
public void close()

View File

@ -144,7 +144,7 @@ public class QuicConnection extends AbstractConnection
}
else
{
session = new QuicSession(connector, getExecutor(), scheduler, byteBufferPool, quicheConnectionId, quicheConnection, this, remoteAddress);
session = new QuicSession(getExecutor(), scheduler, byteBufferPool, quicheConnectionId, quicheConnection, this, remoteAddress, connector);
sessions.putIfAbsent(quicheConnectionId, session);
session.flush(); // send the response packet(s) that accept generated.
if (LOG.isDebugEnabled())

View File

@ -48,10 +48,11 @@ public class QuicSession
{
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
private final Flusher flusher;
private final Connector connector;
private final Flusher flusher;
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
private final QuicheConnectionId quicheConnectionId;
private final QuicheConnection quicheConnection;
private final QuicConnection connection;
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
@ -59,10 +60,11 @@ public class QuicSession
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private InetSocketAddress remoteAddress;
private QuicheConnectionId quicheConnectionId;
QuicSession(Connector connector, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Connector connector)
{
this.connector = connector;
this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool;
this.quicheConnectionId = quicheConnectionId;
this.quicheConnection = quicheConnection;
@ -77,6 +79,13 @@ public class QuicSession
}
}, executor);
LifeCycle.start(strategy);
this.connector = connector;
}
public void createStream(long streamId)
{
getOrCreateStreamEndPoint(streamId);
}
public int fill(long streamId, ByteBuffer buffer) throws IOException
@ -127,6 +136,16 @@ public class QuicSession
return remoteAddress;
}
public boolean isConnectionEstablished()
{
return quicheConnection.isConnectionEstablished();
}
public void setConnectionId(QuicheConnectionId quicheConnectionId)
{
this.quicheConnectionId = quicheConnectionId;
}
void process(InetSocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{
this.remoteAddress = remoteAddress;
@ -197,7 +216,7 @@ public class QuicSession
{
if (quicStreamEndPoint == null)
{
quicStreamEndPoint = createQuicStreamEndPoint(connector, connector.getScheduler(), streamId);
quicStreamEndPoint = createQuicStreamEndPoint(streamId);
if (LOG.isDebugEnabled())
LOG.debug("creating endpoint for stream {}", sid);
}
@ -208,7 +227,7 @@ public class QuicSession
return endPoint;
}
private QuicStreamEndPoint createQuicStreamEndPoint(Connector connector, Scheduler scheduler, long streamId)
private QuicStreamEndPoint createQuicStreamEndPoint(long streamId)
{
String negotiatedProtocol = quicheConnection.getNegotiatedProtocol();
ConnectionFactory connectionFactory = connector.getConnectionFactory(negotiatedProtocol);