make client use common classes

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-23 13:34:21 +01:00 committed by Simone Bordet
parent 7d303b14c8
commit 8315ba5815
6 changed files with 186 additions and 82 deletions

View File

@ -354,7 +354,7 @@ public class ClientDatagramConnector extends ContainerLifeCycle implements IClie
{ {
Connect connect = (Connect)attachment; Connect connect = (Connect)attachment;
Map<String, Object> contextMap = connect.getContext(); Map<String, Object> contextMap = connect.getContext();
return new QuicConnection(executor, scheduler, byteBufferPool, endPoint, quicheConfig, contextMap); return new ClientQuicConnection(executor, scheduler, byteBufferPool, endPoint, quicheConfig, contextMap);
} }
@Override @Override

View File

@ -0,0 +1,89 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicConnection;
import org.eclipse.jetty.http3.common.QuicSession;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientQuicConnection extends QuicConnection
{
private static final Logger LOG = LoggerFactory.getLogger(ClientQuicConnection.class);
private final Map<InetSocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<String, Object> context;
public ClientQuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, QuicheConfig quicheConfig, Map<String, Object> context)
{
super(executor, scheduler, byteBufferPool, endp, quicheConfig);
this.context = context;
}
@Override
public void onOpen()
{
super.onOpen();
try
{
InetSocketAddress remoteAddress = (InetSocketAddress)context.get(ClientDatagramConnector.REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
QuicheConnection quicheConnection = QuicheConnection.connect(getQuicheConfig(), remoteAddress);
QuicSession session = new ClientQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), null, quicheConnection, this, remoteAddress, context);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that connect generated.
if (LOG.isDebugEnabled())
LOG.debug("created connecting QUIC session {}", session);
}
catch (IOException e)
{
throw new RuntimeIOException("Error trying to open connection", e);
}
fillInterested();
}
@Override
protected QuicSession findPendingSession(InetSocketAddress remoteAddress)
{
return pendingSessions.get(remoteAddress);
}
@Override
protected boolean promoteSession(QuicheConnectionId quicheConnectionId, InetSocketAddress remoteAddress)
{
QuicSession session = pendingSessions.get(remoteAddress);
if (session != null && session.isConnectionEstablished())
{
pendingSessions.remove(remoteAddress);
session.setConnectionId(quicheConnectionId);
session.createStream(0);
return true;
}
return false;
}
}

View File

@ -0,0 +1,61 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicConnection;
import org.eclipse.jetty.http3.common.QuicSession;
import org.eclipse.jetty.http3.common.QuicStreamEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.thread.Scheduler;
public class ClientQuicSession extends QuicSession
{
private final Map<String, Object> context;
protected ClientQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Map<String, Object> context)
{
super(executor, scheduler, byteBufferPool, quicheConnectionId, quicheConnection, connection, remoteAddress);
this.context = context;
}
@Override
protected QuicStreamEndPoint createQuicStreamEndPoint(long streamId)
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientDatagramConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(getScheduler(), this, streamId);
Connection connection;
try
{
connection = connectionFactory.newConnection(endPoint, context);
}
catch (IOException e)
{
throw new RuntimeIOException("Error creating new connection", e);
}
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
return endPoint;
}
}

View File

@ -11,26 +11,21 @@
// ======================================================================== // ========================================================================
// //
package org.eclipse.jetty.http3.client; package org.eclipse.jetty.http3.common;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConfig; import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId; import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche; import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
@ -39,29 +34,37 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http3.client.ClientDatagramConnector.REMOTE_SOCKET_ADDRESS_CONTEXT_KEY; public abstract class QuicConnection extends AbstractConnection
public class QuicConnection extends AbstractConnection
{ {
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class); private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
private final Map<InetSocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<String, Object> context;
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>(); private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
private final Scheduler scheduler; private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
private final QuicheConfig quicheConfig; private final QuicheConfig quicheConfig;
private final Flusher flusher = new Flusher(); private final Flusher flusher = new Flusher();
public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, QuicheConfig quicheConfig, Map<String, Object> context) protected QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, QuicheConfig quicheConfig)
{ {
super(endp, executor); super(endp, executor);
this.scheduler = scheduler; this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool; this.byteBufferPool = byteBufferPool;
this.quicheConfig = quicheConfig; this.quicheConfig = quicheConfig;
}
this.context = context; public Scheduler getScheduler()
{
return scheduler;
}
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
public QuicheConfig getQuicheConfig()
{
return quicheConfig;
} }
void onClose(QuicheConnectionId quicheConnectionId) void onClose(QuicheConnectionId quicheConnectionId)
@ -76,29 +79,6 @@ public class QuicConnection extends AbstractConnection
super.close(); super.close();
} }
@Override
public void onOpen()
{
super.onOpen();
try
{
InetSocketAddress remoteAddress = (InetSocketAddress)context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
QuicheConnection quicheConnection = QuicheConnection.connect(quicheConfig, remoteAddress);
QuicSession session = new QuicSession(getExecutor(), scheduler, this.byteBufferPool, null, quicheConnection, this, remoteAddress, context);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that accept generated.
if (LOG.isDebugEnabled())
LOG.debug("created connecting QUIC session {}", session);
}
catch (IOException e)
{
throw new RuntimeIOException("Error trying to open connection", e);
}
fillInterested();
}
@Override @Override
public void onFillable() public void onFillable()
{ {
@ -139,30 +119,20 @@ public class QuicConnection extends AbstractConnection
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("packet contains connection ID {}", quicheConnectionId); LOG.debug("packet contains connection ID {}", quicheConnectionId);
boolean pending = false;
QuicSession session = sessions.get(quicheConnectionId); QuicSession session = sessions.get(quicheConnectionId);
if (session == null) if (session == null)
{ {
session = pendingSessions.get(remoteAddress); session = findPendingSession(remoteAddress);
if (session == null) if (session == null)
throw new IllegalStateException("cannot find session with ID " + quicheConnectionId); continue;
pending = true;
session.setConnectionId(quicheConnectionId);
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("packet is for existing session with connection ID {}, processing it ({} byte(s))", quicheConnectionId, cipherBuffer.remaining()); LOG.debug("packet is for existing session with connection ID {}, processing it ({} byte(s))", quicheConnectionId, cipherBuffer.remaining());
session.process(remoteAddress, cipherBuffer); session.process(remoteAddress, cipherBuffer);
if (pending) if (promoteSession(quicheConnectionId, remoteAddress))
{ sessions.put(quicheConnectionId, session);
if (session.isConnectionEstablished())
{
pendingSessions.remove(remoteAddress);
sessions.put(quicheConnectionId, session);
session.createStream(0);
}
}
} }
} }
catch (Throwable x) catch (Throwable x)
@ -173,6 +143,10 @@ public class QuicConnection extends AbstractConnection
} }
} }
protected abstract QuicSession findPendingSession(InetSocketAddress remoteAddress);
protected abstract boolean promoteSession(QuicheConnectionId quicheConnectionId, InetSocketAddress remoteAddress);
public void write(Callback callback, InetSocketAddress remoteAddress, ByteBuffer... buffers) public void write(Callback callback, InetSocketAddress remoteAddress, ByteBuffer... buffers)
{ {
flusher.offer(callback, remoteAddress, buffers); flusher.offer(callback, remoteAddress, buffers);

View File

@ -11,14 +11,13 @@
// ======================================================================== // ========================================================================
// //
package org.eclipse.jetty.http3.client; package org.eclipse.jetty.http3.common;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -30,10 +29,7 @@ import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche; import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.CyclicTimeout; import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
@ -44,12 +40,10 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class QuicSession public abstract class QuicSession
{ {
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class); private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
private final Map<String, Object> context;
private final Flusher flusher; private final Flusher flusher;
private final Scheduler scheduler; private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
@ -62,7 +56,7 @@ public class QuicSession
private InetSocketAddress remoteAddress; private InetSocketAddress remoteAddress;
private QuicheConnectionId quicheConnectionId; private QuicheConnectionId quicheConnectionId;
QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Map<String, Object> context) protected QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
{ {
this.scheduler = scheduler; this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool; this.byteBufferPool = byteBufferPool;
@ -79,8 +73,11 @@ public class QuicSession
} }
}, executor); }, executor);
LifeCycle.start(strategy); LifeCycle.start(strategy);
}
this.context = context; public Scheduler getScheduler()
{
return scheduler;
} }
public void createStream(long streamId) public void createStream(long streamId)
@ -205,7 +202,7 @@ public class QuicSession
strategy.dispatch(); strategy.dispatch();
} }
void flush() public void flush()
{ {
flusher.iterate(); flusher.iterate();
} }
@ -227,24 +224,7 @@ public class QuicSession
return endPoint; return endPoint;
} }
private QuicStreamEndPoint createQuicStreamEndPoint(long streamId) protected abstract QuicStreamEndPoint createQuicStreamEndPoint(long streamId);
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientDatagramConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(scheduler, this, streamId);
Connection connection;
try
{
connection = connectionFactory.newConnection(endPoint, context);
}
catch (IOException e)
{
throw new RuntimeIOException("Error creating new connection", e);
}
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
return endPoint;
}
public void close() public void close()
{ {

View File

@ -11,7 +11,7 @@
// ======================================================================== // ========================================================================
// //
package org.eclipse.jetty.http3.client; package org.eclipse.jetty.http3.common;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;