encode address into buffers

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-15 14:42:17 +01:00 committed by Simone Bordet
parent 340eb46b43
commit e1cd6956f4
4 changed files with 314 additions and 115 deletions

View File

@ -0,0 +1,195 @@
package org.eclipse.jetty.http3.server;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
public class DatagramAdaptingEndPoint implements EndPoint
{
private final ServerDatagramEndPoint delegate;
private InetSocketAddress remoteAddress;
public DatagramAdaptingEndPoint(ServerDatagramEndPoint delegate)
{
this.delegate = delegate;
}
@Override
public InetSocketAddress getLocalAddress()
{
return delegate.getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return remoteAddress;
}
@Override
public boolean isOpen()
{
return delegate.isOpen();
}
@Override
public long getCreatedTimeStamp()
{
return delegate.getCreatedTimeStamp();
}
@Override
public void shutdownOutput()
{
delegate.shutdownOutput();
}
@Override
public boolean isOutputShutdown()
{
return delegate.isOutputShutdown();
}
@Override
public boolean isInputShutdown()
{
return delegate.isInputShutdown();
}
@Override
public void close(Throwable cause)
{
delegate.close(cause);
}
@Override
public int fill(ByteBuffer buffer) throws IOException
{
int filled = delegate.fill(buffer);
if (filled == 0)
return 0;
int headerPosition = buffer.position();
byte[] address;
byte ipVersion = buffer.get();
if (ipVersion == 4)
address = new byte[4];
else if (ipVersion == 6)
address = new byte[6];
else throw new IOException("Unsupported IP version: " + ipVersion);
buffer.get(address);
int port = buffer.getChar();
remoteAddress = new InetSocketAddress(InetAddress.getByAddress(address), port);
buffer.position(headerPosition + 19);
return filled;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
return delegate.flush(buffers);
}
@Override
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
ByteBuffer[] delegateBuffers = new ByteBuffer[buffers.length + 1];
System.arraycopy(buffers, 0, delegateBuffers, 1, buffers.length);
delegateBuffers[0] = ByteBuffer.allocate(19);
byte[] addressBytes = remoteAddress.getAddress().getAddress();
byte ipVersion;
if (remoteAddress.getAddress() instanceof Inet4Address)
ipVersion = 4;
else if (remoteAddress.getAddress() instanceof Inet6Address)
ipVersion = 6;
else throw new IllegalArgumentException("Unsupported address type: " + remoteAddress.getAddress().getClass());
int port = remoteAddress.getPort();
delegateBuffers[0].put(ipVersion);
delegateBuffers[0].put(addressBytes);
delegateBuffers[0].putChar((char)port);
delegateBuffers[0].position(0);
delegate.write(callback, delegateBuffers);
}
@Override
public Object getTransport()
{
return delegate.getTransport();
}
@Override
public long getIdleTimeout()
{
return delegate.getIdleTimeout();
}
@Override
public void setIdleTimeout(long idleTimeout)
{
delegate.setIdleTimeout(idleTimeout);
}
@Override
public void fillInterested(Callback callback) throws ReadPendingException
{
delegate.fillInterested(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
return delegate.tryFillInterested(callback);
}
@Override
public boolean isFillInterested()
{
return delegate.isFillInterested();
}
@Override
public Connection getConnection()
{
return delegate.getConnection();
}
@Override
public void setConnection(Connection connection)
{
delegate.setConnection(connection);
}
@Override
public void onOpen()
{
delegate.onOpen();
}
@Override
public void onClose(Throwable cause)
{
delegate.onClose(cause);
}
@Override
public void upgrade(Connection newConnection)
{
delegate.upgrade(newConnection);
}
}

View File

@ -3,15 +3,11 @@ package org.eclipse.jetty.http3.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.EventListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.HttpCompliance;
@ -25,7 +21,6 @@ import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.thread.Scheduler;
@ -153,8 +148,6 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
private class ServerDatagramSelectorManager extends SelectorManager
{
private final ConcurrentMap<SocketAddress, ServerDatagramEndPoint> _acceptedChannels = new ConcurrentHashMap<>();
protected ServerDatagramSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
@ -171,10 +164,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
ManagedSelector.PeerAware attachment = (ManagedSelector.PeerAware)selectionKey.attachment();
ServerDatagramEndPoint serverDatagramEndPoint = _acceptedChannels.get(attachment.peer());
serverDatagramEndPoint.init(selector, selectionKey);
return serverDatagramEndPoint;
return new ServerDatagramEndPoint(getScheduler(), (DatagramChannel)channel, selector, selectionKey);
}
@Override
@ -184,32 +174,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
//return new QuicConnection();
HttpConfiguration config = new HttpConfiguration();
config.setHttpCompliance(HttpCompliance.LEGACY); // enable HTTP/0.9
return new HttpConnection(config, ServerDatagramConnector.this, endpoint, false);
}
protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException
{
ByteBuffer buffer = getByteBufferPool().acquire(1200, true);
BufferUtil.flipToFill(buffer);
LOG.info("doReadDatagram {}", channel);
DatagramChannel datagramChannel = (DatagramChannel)channel;
SocketAddress peer = datagramChannel.receive(buffer);
buffer.flip();
LOG.info("doReadDatagram received {} byte(s)", buffer.remaining());
SocketAddress localAddress = datagramChannel.getLocalAddress();
boolean[] created = new boolean[1];
ServerDatagramEndPoint endPoint = _acceptedChannels.computeIfAbsent(peer, remoteAddress ->
{
ServerDatagramEndPoint endp = new ServerDatagramEndPoint(getScheduler(), localAddress, remoteAddress, buffer, datagramChannel);
created[0] = true;
return endp;
});
if (created[0])
return peer;
endPoint.onData(buffer, peer);
return null;
return new HttpConnection(config, ServerDatagramConnector.this, new DatagramAdaptingEndPoint((ServerDatagramEndPoint)endpoint), false);
}
@Override
@ -218,23 +183,17 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
return String.format("DatagramSelectorManager@%s", ServerDatagramConnector.this);
}
class DatagramReader implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Closeable, ManagedSelector.PeerAware
class DatagramReader implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Closeable
{
private final SelectableChannel _channel;
private SelectionKey _key;
private SocketAddress _peer;
private volatile boolean endPointCreated;
private volatile SelectionKey _key;
DatagramReader(SelectableChannel channel)
{
_channel = channel;
}
@Override
public SocketAddress peer()
{
return _peer;
}
@Override
public void update(Selector selector)
{
@ -255,26 +214,24 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
public Runnable onSelected()
{
LOG.info("DatagramReader onSelected");
try
if (!endPointCreated)
{
_peer = doReadDatagram(_channel);
if (_peer != null)
synchronized (this)
{
try
if (!endPointCreated)
{
chooseSelector().createEndPoint(_channel, _key);
onAccepted(_channel);
}
catch (Throwable x)
{
LOG.warn("createEndPoint failed for channel {}", _channel, x);
try
{
chooseSelector().createEndPoint(_channel, _key);
endPointCreated = true;
}
catch (Throwable x)
{
LOG.warn("createEndPoint failed for channel {}", _channel, x);
}
}
}
}
catch (Throwable x)
{
LOG.warn("Read failed for channel {}", _channel, x);
}
return null;
}

View File

@ -1,6 +1,9 @@
package org.eclipse.jetty.http3.server;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@ -9,6 +12,7 @@ import java.nio.channels.ReadPendingException;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritePendingException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -21,7 +25,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint
public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, ManagedSelector.Selectable
{
private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class);
@ -33,46 +37,36 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint
}
};
private final DatagramChannel channel;
private final SocketAddress localAddress;
private SocketAddress remoteAddress;
private ManagedSelector selector;
private SelectionKey selectionKey;
private final ManagedSelector selector;
private final SelectionKey selectionKey;
private Connection connection;
private ByteBuffer data;
private boolean open;
public ServerDatagramEndPoint(Scheduler scheduler, SocketAddress localAddress, SocketAddress remoteAddress, ByteBuffer buffer, DatagramChannel channel)
public ServerDatagramEndPoint(Scheduler scheduler, DatagramChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
super(scheduler);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
this.channel = channel;
this.data = buffer;
}
public void init(ManagedSelector selector, SelectionKey selectionKey)
{
this.selector = selector;
this.selectionKey = selectionKey;
}
public void onData(ByteBuffer buffer, SocketAddress peer)
{
this.remoteAddress = peer;
this.data = buffer;
fillInterest.fillable();
}
@Override
public InetSocketAddress getLocalAddress()
{
return (InetSocketAddress)localAddress;
try
{
return (InetSocketAddress)channel.getLocalAddress();
}
catch (IOException e)
{
return null;
}
}
@Override
public InetSocketAddress getRemoteAddress()
{
return (InetSocketAddress)remoteAddress;
return null;
}
@Override
@ -114,27 +108,63 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint
@Override
public int fill(ByteBuffer buffer) throws IOException
{
if (data != null)
BufferUtil.flipToFill(buffer);
int headerPosition = buffer.position();
buffer.position(buffer.position() + 19);
InetSocketAddress peer = (InetSocketAddress)channel.receive(buffer);
if (peer == null)
{
int before = data.remaining();
LOG.info("fill; bytes remaining: {} byte(s)", before);
BufferUtil.flipToFill(buffer);
buffer.put(data);
buffer.position(0);
buffer.flip();
int after = data.remaining();
if (after == 0)
data = null;
int filled = before - after;
LOG.info("filled {} byte(s)", filled);
return filled;
return 0;
}
return 0;
int finalPosition = buffer.position();
byte[] addressBytes = peer.getAddress().getAddress();
int port = peer.getPort();
byte ipVersion;
if (peer.getAddress() instanceof Inet4Address)
ipVersion = 4;
else if (peer.getAddress() instanceof Inet6Address)
ipVersion = 6;
else throw new IOException("Unsupported address type: " + peer.getAddress().getClass());
buffer.position(headerPosition);
buffer.put(ipVersion);
buffer.put(addressBytes);
buffer.putChar((char)port);
buffer.position(finalPosition);
buffer.flip();
return finalPosition - 19;
}
@Override
public boolean flush(ByteBuffer... buffer) throws IOException
public boolean flush(ByteBuffer... buffers) throws IOException
{
throw new UnsupportedOperationException();
ByteBuffer headerBuffer = buffers[0];
byte ipVersion = headerBuffer.get();
byte[] address;
if (ipVersion == 4)
address = new byte[4];
else if (ipVersion == 6)
address = new byte[16];
else throw new IOException("Unsupported IP version: " + ipVersion);
headerBuffer.get(address);
int port = headerBuffer.getChar();
InetSocketAddress peer = new InetSocketAddress(InetAddress.getByAddress(address), port);
for (int i = 1; i < buffers.length; i++)
{
ByteBuffer buffer = buffers[i];
int sent = channel.send(buffer, peer);
if (sent == 0)
return false;
}
return true;
}
@Override
@ -149,11 +179,36 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint
LOG.info("idle timeout", timeout);
}
//TODO: this is racy
private final AtomicBoolean fillable = new AtomicBoolean();
@Override
public Runnable onSelected()
{
return () ->
{
if (!fillInterest.fillable())
fillable.set(true);
};
}
@Override
public void updateKey()
{
// TODO: change interest?
}
@Override
public void replaceKey(SelectionKey newKey)
{
throw new UnsupportedOperationException();
}
@Override
public void fillInterested(Callback callback) throws ReadPendingException
{
fillInterest.register(callback);
if (data != null)
if (fillable.getAndSet(false))
fillInterest.fillable();
}
@ -161,7 +216,7 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint
public boolean tryFillInterested(Callback callback)
{
boolean registered = fillInterest.tryRegister(callback);
if (registered && data != null)
if (registered && fillable.getAndSet(false))
fillInterest.fillable();
return registered;
}
@ -175,19 +230,16 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint
@Override
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
for (ByteBuffer buffer : buffers)
try
{
try
{
int sent = channel.send(buffer, getRemoteAddress());
}
catch (IOException e)
{
callback.failed(e);
return;
}
boolean done = flush(buffers);
if (done)
callback.succeeded();
}
catch (IOException e)
{
callback.failed(e);
}
callback.succeeded();
}
@Override

View File

@ -844,11 +844,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
}
public interface PeerAware
{
SocketAddress peer();
}
class Accept implements SelectorUpdate, Runnable, Closeable
{
private final SelectableChannel channel;