align ServerDatagramEndPoint with SocketChannelEndPoint

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-17 14:49:41 +01:00 committed by Simone Bordet
parent aaf0239911
commit fc25540331
4 changed files with 190 additions and 236 deletions

View File

@ -257,7 +257,7 @@ public class QuicSession
if (quicheConnection.isConnectionClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("quiche connection closed");
LOG.debug("quiche connection is in closed state");
QuicSession.this.close();
}
return Action.IDLE;

View File

@ -174,7 +174,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new ServerDatagramEndPoint(getScheduler(), (DatagramChannel)channel, selector, selectionKey);
return new ServerDatagramEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler());
}
@Override

View File

@ -19,31 +19,72 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ReadPendingException;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritePendingException;
import java.util.concurrent.TimeoutException;
import java.nio.channels.Selector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, ManagedSelector.Selectable
public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
{
private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class);
private final long createdTimeStamp = System.currentTimeMillis();
private final AutoLock _lock = new AutoLock();
private final DatagramChannel _channel;
private final ManagedSelector _selector;
private SelectionKey _key;
private boolean _updatePending;
// The current value for interestOps.
private int _currentInterestOps;
// The desired value for interestOps.
private int _desiredInterestOps;
private abstract class RunnableTask implements Runnable, Invocable
{
final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("%s:%s:%s", ServerDatagramEndPoint.this, _operation, getInvocationType());
}
}
private abstract class RunnableCloseable extends ServerDatagramEndPoint.RunnableTask implements Closeable
{
protected RunnableCloseable(String op)
{
super(op);
}
@Override
public void close()
{
try
{
ServerDatagramEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", ServerDatagramEndPoint.this, x);
}
}
}
private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
private final Runnable _runFillable = new ServerDatagramEndPoint.RunnableCloseable("runFillable")
{
@Override
@ -107,64 +148,18 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man
}
};
private final FillInterest fillInterest = new FillInterest()
{
@Override
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
}
};
private final WriteFlusher writeFlusher = new WriteFlusher(this)
{
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
};
public FillInterest getFillInterest()
{
return fillInterest;
}
public WriteFlusher getWriteFlusher()
{
return writeFlusher;
}
private final ManagedSelector.SelectorUpdate _updateKeyAction = s -> updateKey();
private final DatagramChannel channel;
private final ManagedSelector _selector;
private Connection connection;
private boolean open;
private SelectionKey _key;
private boolean _updatePending;
private int _currentInterestOps;
private int _desiredInterestOps;
public ServerDatagramEndPoint(Scheduler scheduler, DatagramChannel channel, ManagedSelector selector, SelectionKey selectionKey)
public ServerDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
this.channel = channel;
this._selector = selector;
this._key = selectionKey;
_channel = channel;
_selector = selector;
_key = key;
}
@Override
public InetSocketAddress getLocalAddress()
{
try
{
return (InetSocketAddress)channel.getLocalAddress();
}
catch (Throwable x)
{
return null;
}
return (InetSocketAddress)_channel.socket().getLocalSocketAddress();
}
@Override
@ -176,46 +171,57 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man
@Override
public boolean isOpen()
{
return open;
return _channel.isOpen();
}
@Override
public long getCreatedTimeStamp()
protected void doShutdownOutput()
{
return createdTimeStamp;
}
@Override
public void shutdownOutput()
public void doClose()
{
throw new UnsupportedOperationException();
if (LOG.isDebugEnabled())
LOG.debug("doClose {}", this);
try
{
_channel.close();
}
catch (IOException e)
{
LOG.debug("Unable to close channel", e);
}
finally
{
super.doClose();
}
}
@Override
public boolean isOutputShutdown()
public void onClose(Throwable cause)
{
throw new UnsupportedOperationException();
}
@Override
public boolean isInputShutdown()
{
throw new UnsupportedOperationException();
}
@Override
public void close(Throwable cause)
{
LOG.info("closed endpoint");
try
{
super.onClose(cause);
}
finally
{
if (_selector != null)
_selector.destroyEndPoint(this, cause);
}
}
@Override
public int fill(ByteBuffer buffer) throws IOException
{
if (isInputShutdown())
return -1;
int pos = BufferUtil.flipToFill(buffer);
buffer.position(pos + AddressCodec.ENCODED_ADDRESS_LENGTH);
InetSocketAddress peer = (InetSocketAddress)channel.receive(buffer);
InetSocketAddress peer = (InetSocketAddress)_channel.receive(buffer);
if (peer == null)
{
buffer.position(pos);
@ -223,41 +229,118 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man
return 0;
}
notIdle();
int finalPosition = buffer.position();
buffer.position(pos);
AddressCodec.encodeInetSocketAddress(buffer, peer);
buffer.position(finalPosition);
BufferUtil.flipToFlush(buffer, pos);
return finalPosition - AddressCodec.ENCODED_ADDRESS_LENGTH;
int filled = finalPosition - AddressCodec.ENCODED_ADDRESS_LENGTH;
if (LOG.isDebugEnabled())
LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer));
return filled;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
InetSocketAddress peer = AddressCodec.decodeInetSocketAddress(buffers[0]);
for (int i = 1; i < buffers.length; i++)
long flushed = 0;
try
{
ByteBuffer buffer = buffers[i];
int sent = channel.send(buffer, peer);
if (sent == 0)
InetSocketAddress peer = AddressCodec.decodeInetSocketAddress(buffers[0]);
for (int i = 1; i < buffers.length; i++)
{
ByteBuffer buffer = buffers[i];
int sent = _channel.send(buffer, peer);
if (sent == 0)
break;
flushed += sent;
}
if (LOG.isDebugEnabled())
LOG.debug("flushed {} {}", flushed, this);
}
catch (IOException e)
{
throw new EofException(e);
}
if (flushed > 0)
notIdle();
for (ByteBuffer b : buffers)
{
if (!BufferUtil.isEmpty(b))
return false;
}
return true;
}
public DatagramChannel getChannel()
{
return _channel;
}
@Override
public Object getTransport()
{
return this.channel;
return _channel;
}
@Override
protected void onIdleExpired(TimeoutException timeout)
protected void needsFillInterest()
{
// TODO: close the channel.
LOG.info("idle timeout", timeout);
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
@Override
public Runnable onSelected()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
private void updateKeyAction(Selector selector)
{
updateKey();
}
@Override
@ -301,7 +384,7 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man
@Override
public void replaceKey(SelectionKey newKey)
{
this._key = newKey;
_key = newKey;
}
private void changeInterests(int operation)
@ -329,143 +412,14 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man
}
@Override
public Runnable onSelected()
public String toEndPointString()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
@Override
public void fillInterested(Callback callback) throws ReadPendingException
{
fillInterest.register(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
return fillInterest.tryRegister(callback);
}
@Override
public boolean isFillInterested()
{
return fillInterest.isInterested();
}
@Override
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
writeFlusher.write(callback, buffers);
}
@Override
public Connection getConnection()
{
return connection;
}
@Override
public void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public void onOpen()
{
super.onOpen();
open = true;
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}", this);
}
@Override
public void onClose()
{
super.onClose();
onClose(null);
}
@Override
public void onClose(Throwable cause)
{
open = false;
if (LOG.isDebugEnabled())
LOG.debug("onClose {}", this);
}
@Override
public void upgrade(Connection newConnection)
{
throw new UnsupportedOperationException();
}
private abstract class RunnableTask implements Runnable, Invocable
{
final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("%s:%s:%s", ServerDatagramEndPoint.this, _operation, getInvocationType());
}
}
private abstract class RunnableCloseable extends ServerDatagramEndPoint.RunnableTask implements Closeable
{
protected RunnableCloseable(String op)
{
super(op);
}
@Override
public void close()
{
try
{
ServerDatagramEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", ServerDatagramEndPoint.this, x);
}
}
// We do a best effort to print the right toString() and that's it.
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toEndPointString(),
_currentInterestOps,
_desiredInterestOps,
ManagedSelector.safeInterestOps(_key),
ManagedSelector.safeReadyOps(_key));
}
}

View File

@ -403,7 +403,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Created {}", endPoint);
}
void destroyEndPoint(EndPoint endPoint, Throwable cause)
public void destroyEndPoint(EndPoint endPoint, Throwable cause)
{
// Waking up the selector is necessary to clean the
// cancelled-key set and tell the TCP stack that the
@ -420,7 +420,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
}
static int safeReadyOps(SelectionKey selectionKey)
public static int safeReadyOps(SelectionKey selectionKey)
{
try
{
@ -433,7 +433,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
}
static int safeInterestOps(SelectionKey selectionKey)
public static int safeInterestOps(SelectionKey selectionKey)
{
try
{