kinda sorta working datagram connector

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-12 12:59:43 +01:00 committed by Simone Bordet
parent ccfe4ec90f
commit 45e45e6cf2
5 changed files with 392 additions and 3 deletions

View File

@ -0,0 +1,81 @@
package org.eclipse.jetty.http3.server;
import java.util.EventListener;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
public class QuicConnection implements Connection
{
@Override
public void addEventListener(EventListener listener)
{
}
@Override
public void removeEventListener(EventListener listener)
{
}
@Override
public void onOpen()
{
}
@Override
public void onClose(Throwable cause)
{
}
@Override
public EndPoint getEndPoint()
{
return null;
}
@Override
public void close()
{
}
@Override
public boolean onIdleExpired()
{
return false;
}
@Override
public long getMessagesIn()
{
return 0;
}
@Override
public long getMessagesOut()
{
return 0;
}
@Override
public long getBytesIn()
{
return 0;
}
@Override
public long getBytesOut()
{
return 0;
}
@Override
public long getCreatedTimeStamp()
{
return 0;
}
}

View File

@ -3,11 +3,14 @@ package org.eclipse.jetty.http3.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.EventListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
@ -17,6 +20,8 @@ import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.AbstractNetworkConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.Name;
@ -24,9 +29,11 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class ServerDatagramConnector extends AbstractNetworkConnector
{
private final ConcurrentMap<SocketAddress, ServerDatagramEndPoint> _acceptedChannels = new ConcurrentHashMap<>();
private final SelectorManager _manager;
private volatile DatagramChannel _datagramChannel;
private volatile int _localPort = -1;
private Closeable _acceptor;
public ServerDatagramConnector(
@Name("server") Server server,
@ -60,11 +67,13 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
for (EventListener l : getBeans(SelectorManager.SelectorManagerListener.class))
_manager.addEventListener(l);
super.doStart();
_acceptor = _manager.datagramReader(_datagramChannel);
}
@Override
protected void doStop() throws Exception
{
IO.close(_acceptor);
super.doStop();
for (EventListener l : getBeans(EventListener.class))
_manager.removeEventListener(l);
@ -145,7 +154,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
throw new UnsupportedOperationException(getClass().getSimpleName() + " has no accept mechanism");
}
public class ServerDatagramSelectorManager extends SelectorManager
private class ServerDatagramSelectorManager extends SelectorManager
{
protected ServerDatagramSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
@ -155,12 +164,40 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return null;
ManagedSelector.PeerAware attachment = (ManagedSelector.PeerAware)selectionKey.attachment();
ServerDatagramEndPoint serverDatagramEndPoint = _acceptedChannels.get(attachment.peer());
serverDatagramEndPoint.init(selector, selectionKey);
return serverDatagramEndPoint;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
//TODO: return quic connection
//return new QuicConnection();
return new HttpConnection(new HttpConfiguration(), ServerDatagramConnector.this, endpoint, false);
}
@Override
protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException
{
ByteBuffer buffer = getByteBufferPool().acquire(1200, true);
LOG.info("doReadDatagram {}", channel);
DatagramChannel datagramChannel = (DatagramChannel)channel;
SocketAddress peer = datagramChannel.receive(buffer);
SocketAddress localAddress = datagramChannel.getLocalAddress();
boolean[] created = new boolean[1];
ServerDatagramEndPoint endPoint = _acceptedChannels.computeIfAbsent(peer, remoteAddress ->
{
ServerDatagramEndPoint endp = new ServerDatagramEndPoint(localAddress, remoteAddress, buffer, datagramChannel);
created[0] = true;
return endp;
});
if (created[0])
return peer;
endPoint.onData(buffer);
return null;
}

View File

@ -0,0 +1,171 @@
package org.eclipse.jetty.http3.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerDatagramEndPoint implements EndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class);
public ServerDatagramEndPoint(SocketAddress localAddress, SocketAddress remoteAddress, ByteBuffer buffer, SelectableChannel channel)
{
}
public void init(ManagedSelector selector, SelectionKey selectionKey)
{
}
public void onData(ByteBuffer buffer)
{
}
@Override
public InetSocketAddress getLocalAddress()
{
throw new UnsupportedOperationException();
}
@Override
public InetSocketAddress getRemoteAddress()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isOpen()
{
return false;
}
@Override
public long getCreatedTimeStamp()
{
throw new UnsupportedOperationException();
}
@Override
public void shutdownOutput()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isOutputShutdown()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isInputShutdown()
{
throw new UnsupportedOperationException();
}
@Override
public void close(Throwable cause)
{
throw new UnsupportedOperationException();
}
@Override
public int fill(ByteBuffer buffer) throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public boolean flush(ByteBuffer... buffer) throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public Object getTransport()
{
throw new UnsupportedOperationException();
}
@Override
public long getIdleTimeout()
{
throw new UnsupportedOperationException();
}
@Override
public void setIdleTimeout(long idleTimeout)
{
throw new UnsupportedOperationException();
}
@Override
public void fillInterested(Callback callback) throws ReadPendingException
{
throw new UnsupportedOperationException();
}
@Override
public boolean tryFillInterested(Callback callback)
{
throw new UnsupportedOperationException();
}
@Override
public boolean isFillInterested()
{
throw new UnsupportedOperationException();
}
@Override
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
throw new UnsupportedOperationException();
}
@Override
public Connection getConnection()
{
return connection;
}
private Connection connection;
@Override
public void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public void onOpen()
{
LOG.info("onOpen");
}
@Override
public void onClose(Throwable cause)
{
throw new UnsupportedOperationException();
}
@Override
public void upgrade(Connection newConnection)
{
throw new UnsupportedOperationException();
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
@ -843,6 +844,91 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
}
public interface PeerAware
{
SocketAddress peer();
}
class DatagramReader implements SelectorUpdate, Selectable, Closeable, PeerAware
{
private final SelectableChannel _channel;
private SelectionKey _key;
private SocketAddress _peer;
DatagramReader(SelectableChannel channel)
{
_channel = channel;
}
@Override
public SocketAddress peer()
{
return _peer;
}
@Override
public void update(Selector selector)
{
try
{
_key = _channel.register(selector, SelectionKey.OP_READ, this);
if (LOG.isDebugEnabled())
LOG.debug("{} reader={}", this, _channel);
}
catch (Throwable x)
{
IO.close(_channel);
LOG.warn("Unable to register OP_READ on selector for {}", _channel, x);
}
}
@Override
public Runnable onSelected()
{
LOG.info("DatagramReader onSelected");
try
{
_peer = _selectorManager.doReadDatagram(_channel);
if (_peer != null)
{
try
{
createEndPoint(_channel, _key);
_selectorManager.onAccepted(_channel);
}
catch (Throwable x)
{
LOG.warn("createEndPoint failed for channel {}", _channel, x);
}
}
}
catch (Throwable x)
{
LOG.warn("Read failed for channel {}", _channel, x);
}
return null;
}
@Override
public void updateKey()
{
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
@Override
public void close() throws IOException
{
// May be called from any thread.
// Implements AbstractConnector.setAccepting(boolean).
submit(selector -> _key.cancel());
}
}
class Accept implements SelectorUpdate, Runnable, Closeable
{
private final SelectableChannel channel;

View File

@ -211,6 +211,14 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
return acceptor;
}
public Closeable datagramReader(SelectableChannel server)
{
ManagedSelector selector = chooseSelector();
ManagedSelector.DatagramReader acceptor = selector.new DatagramReader(server);
selector.submit(acceptor);
return acceptor;
}
/**
* Callback method when a channel is accepted from the {@link ServerSocketChannel}
* passed to {@link #acceptor(SelectableChannel)}.
@ -350,6 +358,12 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
return ((ServerSocketChannel)server).accept();
}
protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException
{
return null;
}
/**
* <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
* <p>By default it just logs with level warning.</p>