diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java new file mode 100644 index 00000000000..ca05038f7d5 --- /dev/null +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java @@ -0,0 +1,81 @@ +package org.eclipse.jetty.http3.server; + +import java.util.EventListener; + +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; + +public class QuicConnection implements Connection +{ + @Override + public void addEventListener(EventListener listener) + { + + } + + @Override + public void removeEventListener(EventListener listener) + { + + } + + @Override + public void onOpen() + { + + } + + @Override + public void onClose(Throwable cause) + { + + } + + @Override + public EndPoint getEndPoint() + { + return null; + } + + @Override + public void close() + { + + } + + @Override + public boolean onIdleExpired() + { + return false; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getCreatedTimeStamp() + { + return 0; + } +} diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java index 7af6f38b1c6..bbd2d17579a 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java @@ -3,11 +3,14 @@ package org.eclipse.jetty.http3.server; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.util.EventListener; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; @@ -17,6 +20,8 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.server.AbstractNetworkConnector; import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.annotation.Name; @@ -24,9 +29,11 @@ import org.eclipse.jetty.util.thread.Scheduler; public class ServerDatagramConnector extends AbstractNetworkConnector { + private final ConcurrentMap _acceptedChannels = new ConcurrentHashMap<>(); private final SelectorManager _manager; private volatile DatagramChannel _datagramChannel; private volatile int _localPort = -1; + private Closeable _acceptor; public ServerDatagramConnector( @Name("server") Server server, @@ -60,11 +67,13 @@ public class ServerDatagramConnector extends AbstractNetworkConnector for (EventListener l : getBeans(SelectorManager.SelectorManagerListener.class)) _manager.addEventListener(l); super.doStart(); + _acceptor = _manager.datagramReader(_datagramChannel); } @Override protected void doStop() throws Exception { + IO.close(_acceptor); super.doStop(); for (EventListener l : getBeans(EventListener.class)) _manager.removeEventListener(l); @@ -145,7 +154,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector throw new UnsupportedOperationException(getClass().getSimpleName() + " has no accept mechanism"); } - public class ServerDatagramSelectorManager extends SelectorManager + private class ServerDatagramSelectorManager extends SelectorManager { protected ServerDatagramSelectorManager(Executor executor, Scheduler scheduler, int selectors) { @@ -155,12 +164,40 @@ public class ServerDatagramConnector extends AbstractNetworkConnector @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { - return null; + ManagedSelector.PeerAware attachment = (ManagedSelector.PeerAware)selectionKey.attachment(); + ServerDatagramEndPoint serverDatagramEndPoint = _acceptedChannels.get(attachment.peer()); + serverDatagramEndPoint.init(selector, selectionKey); + return serverDatagramEndPoint; } @Override public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException { + //TODO: return quic connection + //return new QuicConnection(); + return new HttpConnection(new HttpConfiguration(), ServerDatagramConnector.this, endpoint, false); + } + + @Override + protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException + { + ByteBuffer buffer = getByteBufferPool().acquire(1200, true); + LOG.info("doReadDatagram {}", channel); + DatagramChannel datagramChannel = (DatagramChannel)channel; + SocketAddress peer = datagramChannel.receive(buffer); + SocketAddress localAddress = datagramChannel.getLocalAddress(); + + boolean[] created = new boolean[1]; + ServerDatagramEndPoint endPoint = _acceptedChannels.computeIfAbsent(peer, remoteAddress -> + { + ServerDatagramEndPoint endp = new ServerDatagramEndPoint(localAddress, remoteAddress, buffer, datagramChannel); + created[0] = true; + return endp; + }); + + if (created[0]) + return peer; + endPoint.onData(buffer); return null; } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java new file mode 100644 index 00000000000..4fe6ab2e811 --- /dev/null +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java @@ -0,0 +1,171 @@ +package org.eclipse.jetty.http3.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ReadPendingException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.WritePendingException; + +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServerDatagramEndPoint implements EndPoint +{ + private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class); + + public ServerDatagramEndPoint(SocketAddress localAddress, SocketAddress remoteAddress, ByteBuffer buffer, SelectableChannel channel) + { + + } + + public void init(ManagedSelector selector, SelectionKey selectionKey) + { + + } + + public void onData(ByteBuffer buffer) + { + + } + + @Override + public InetSocketAddress getLocalAddress() + { + throw new UnsupportedOperationException(); + } + + @Override + public InetSocketAddress getRemoteAddress() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOpen() + { + return false; + } + + @Override + public long getCreatedTimeStamp() + { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdownOutput() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOutputShutdown() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isInputShutdown() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close(Throwable cause) + { + throw new UnsupportedOperationException(); + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean flush(ByteBuffer... buffer) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public Object getTransport() + { + throw new UnsupportedOperationException(); + } + + @Override + public long getIdleTimeout() + { + throw new UnsupportedOperationException(); + } + + @Override + public void setIdleTimeout(long idleTimeout) + { + throw new UnsupportedOperationException(); + } + + @Override + public void fillInterested(Callback callback) throws ReadPendingException + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryFillInterested(Callback callback) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFillInterested() + { + throw new UnsupportedOperationException(); + } + + @Override + public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException + { + throw new UnsupportedOperationException(); + } + + @Override + public Connection getConnection() + { + return connection; + } + + private Connection connection; + + @Override + public void setConnection(Connection connection) + { + this.connection = connection; + } + + @Override + public void onOpen() + { + LOG.info("onOpen"); + } + + @Override + public void onClose(Throwable cause) + { + throw new UnsupportedOperationException(); + } + + @Override + public void upgrade(Connection newConnection) + { + throw new UnsupportedOperationException(); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index cd58b4c5ce2..292266c5bfc 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.io; import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; +import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedSelectorException; @@ -843,6 +844,91 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } + public interface PeerAware + { + SocketAddress peer(); + } + + class DatagramReader implements SelectorUpdate, Selectable, Closeable, PeerAware + { + private final SelectableChannel _channel; + private SelectionKey _key; + private SocketAddress _peer; + + DatagramReader(SelectableChannel channel) + { + _channel = channel; + } + + @Override + public SocketAddress peer() + { + return _peer; + } + + @Override + public void update(Selector selector) + { + try + { + _key = _channel.register(selector, SelectionKey.OP_READ, this); + if (LOG.isDebugEnabled()) + LOG.debug("{} reader={}", this, _channel); + } + catch (Throwable x) + { + IO.close(_channel); + LOG.warn("Unable to register OP_READ on selector for {}", _channel, x); + } + } + + @Override + public Runnable onSelected() + { + LOG.info("DatagramReader onSelected"); + try + { + _peer = _selectorManager.doReadDatagram(_channel); + if (_peer != null) + { + try + { + createEndPoint(_channel, _key); + _selectorManager.onAccepted(_channel); + } + catch (Throwable x) + { + LOG.warn("createEndPoint failed for channel {}", _channel, x); + } + } + } + catch (Throwable x) + { + LOG.warn("Read failed for channel {}", _channel, x); + } + return null; + } + + @Override + public void updateKey() + { + } + + @Override + public void replaceKey(SelectionKey newKey) + { + _key = newKey; + } + + @Override + public void close() throws IOException + { + // May be called from any thread. + // Implements AbstractConnector.setAccepting(boolean). + submit(selector -> _key.cancel()); + } + } + class Accept implements SelectorUpdate, Runnable, Closeable { private final SelectableChannel channel; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 70e16c491d6..880bcc68083 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -211,6 +211,14 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump return acceptor; } + public Closeable datagramReader(SelectableChannel server) + { + ManagedSelector selector = chooseSelector(); + ManagedSelector.DatagramReader acceptor = selector.new DatagramReader(server); + selector.submit(acceptor); + return acceptor; + } + /** * Callback method when a channel is accepted from the {@link ServerSocketChannel} * passed to {@link #acceptor(SelectableChannel)}. @@ -350,6 +358,12 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump return ((ServerSocketChannel)server).accept(); } + protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException + { + return null; + } + + /** *

Callback method invoked when a non-blocking connect cannot be completed.

*

By default it just logs with level warning.