From fc255403317737c3db26ccfd798334f85addd324 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 17 Mar 2021 14:49:41 +0100 Subject: [PATCH] align ServerDatagramEndPoint with SocketChannelEndPoint Signed-off-by: Ludovic Orban --- .../jetty/http3/server/QuicSession.java | 2 +- .../http3/server/ServerDatagramConnector.java | 2 +- .../http3/server/ServerDatagramEndPoint.java | 416 ++++++++---------- .../org/eclipse/jetty/io/ManagedSelector.java | 6 +- 4 files changed, 190 insertions(+), 236 deletions(-) diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java index b364d684b9b..8ee6d295c74 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java @@ -257,7 +257,7 @@ public class QuicSession if (quicheConnection.isConnectionClosed()) { if (LOG.isDebugEnabled()) - LOG.debug("quiche connection closed"); + LOG.debug("quiche connection is in closed state"); QuicSession.this.close(); } return Action.IDLE; diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java index 1af4ef1f037..f48aea90dce 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramConnector.java @@ -174,7 +174,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { - return new ServerDatagramEndPoint(getScheduler(), (DatagramChannel)channel, selector, selectionKey); + return new ServerDatagramEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler()); } @Override diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java index d6d2654f3aa..62a617f06dd 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/ServerDatagramEndPoint.java @@ -19,31 +19,72 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.DatagramChannel; -import java.nio.channels.ReadPendingException; import java.nio.channels.SelectionKey; -import java.nio.channels.WritePendingException; -import java.util.concurrent.TimeoutException; +import java.nio.channels.Selector; -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.FillInterest; -import org.eclipse.jetty.io.IdleTimeout; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.ManagedSelector; -import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, ManagedSelector.Selectable +public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable { private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class); - private final long createdTimeStamp = System.currentTimeMillis(); private final AutoLock _lock = new AutoLock(); + private final DatagramChannel _channel; + private final ManagedSelector _selector; + private SelectionKey _key; + private boolean _updatePending; + // The current value for interestOps. + private int _currentInterestOps; + // The desired value for interestOps. + private int _desiredInterestOps; + + private abstract class RunnableTask implements Runnable, Invocable + { + final String _operation; + + protected RunnableTask(String op) + { + _operation = op; + } + + @Override + public String toString() + { + return String.format("%s:%s:%s", ServerDatagramEndPoint.this, _operation, getInvocationType()); + } + } + + private abstract class RunnableCloseable extends ServerDatagramEndPoint.RunnableTask implements Closeable + { + protected RunnableCloseable(String op) + { + super(op); + } + + @Override + public void close() + { + try + { + ServerDatagramEndPoint.this.close(); + } + catch (Throwable x) + { + LOG.warn("Unable to close {}", ServerDatagramEndPoint.this, x); + } + } + } + + private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction; + private final Runnable _runFillable = new ServerDatagramEndPoint.RunnableCloseable("runFillable") { @Override @@ -107,64 +148,18 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man } }; - private final FillInterest fillInterest = new FillInterest() - { - @Override - protected void needsFillInterest() - { - changeInterests(SelectionKey.OP_READ); - } - }; - private final WriteFlusher writeFlusher = new WriteFlusher(this) - { - @Override - protected void onIncompleteFlush() - { - changeInterests(SelectionKey.OP_WRITE); - } - }; - - public FillInterest getFillInterest() - { - return fillInterest; - } - - public WriteFlusher getWriteFlusher() - { - return writeFlusher; - } - - private final ManagedSelector.SelectorUpdate _updateKeyAction = s -> updateKey(); - - private final DatagramChannel channel; - private final ManagedSelector _selector; - private Connection connection; - private boolean open; - - private SelectionKey _key; - private boolean _updatePending; - private int _currentInterestOps; - private int _desiredInterestOps; - - public ServerDatagramEndPoint(Scheduler scheduler, DatagramChannel channel, ManagedSelector selector, SelectionKey selectionKey) + public ServerDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { super(scheduler); - this.channel = channel; - this._selector = selector; - this._key = selectionKey; + _channel = channel; + _selector = selector; + _key = key; } @Override public InetSocketAddress getLocalAddress() { - try - { - return (InetSocketAddress)channel.getLocalAddress(); - } - catch (Throwable x) - { - return null; - } + return (InetSocketAddress)_channel.socket().getLocalSocketAddress(); } @Override @@ -176,46 +171,57 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man @Override public boolean isOpen() { - return open; + return _channel.isOpen(); } @Override - public long getCreatedTimeStamp() + protected void doShutdownOutput() { - return createdTimeStamp; } @Override - public void shutdownOutput() + public void doClose() { - throw new UnsupportedOperationException(); + if (LOG.isDebugEnabled()) + LOG.debug("doClose {}", this); + try + { + _channel.close(); + } + catch (IOException e) + { + LOG.debug("Unable to close channel", e); + } + finally + { + super.doClose(); + } } @Override - public boolean isOutputShutdown() + public void onClose(Throwable cause) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isInputShutdown() - { - throw new UnsupportedOperationException(); - } - - @Override - public void close(Throwable cause) - { - LOG.info("closed endpoint"); + try + { + super.onClose(cause); + } + finally + { + if (_selector != null) + _selector.destroyEndPoint(this, cause); + } } @Override public int fill(ByteBuffer buffer) throws IOException { + if (isInputShutdown()) + return -1; + int pos = BufferUtil.flipToFill(buffer); buffer.position(pos + AddressCodec.ENCODED_ADDRESS_LENGTH); - InetSocketAddress peer = (InetSocketAddress)channel.receive(buffer); + InetSocketAddress peer = (InetSocketAddress)_channel.receive(buffer); if (peer == null) { buffer.position(pos); @@ -223,41 +229,118 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man return 0; } + notIdle(); int finalPosition = buffer.position(); buffer.position(pos); AddressCodec.encodeInetSocketAddress(buffer, peer); buffer.position(finalPosition); - BufferUtil.flipToFlush(buffer, pos); - return finalPosition - AddressCodec.ENCODED_ADDRESS_LENGTH; + int filled = finalPosition - AddressCodec.ENCODED_ADDRESS_LENGTH; + if (LOG.isDebugEnabled()) + LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer)); + return filled; } @Override public boolean flush(ByteBuffer... buffers) throws IOException { - InetSocketAddress peer = AddressCodec.decodeInetSocketAddress(buffers[0]); - for (int i = 1; i < buffers.length; i++) + long flushed = 0; + try { - ByteBuffer buffer = buffers[i]; - int sent = channel.send(buffer, peer); - if (sent == 0) + InetSocketAddress peer = AddressCodec.decodeInetSocketAddress(buffers[0]); + for (int i = 1; i < buffers.length; i++) + { + ByteBuffer buffer = buffers[i]; + int sent = _channel.send(buffer, peer); + if (sent == 0) + break; + flushed += sent; + } + if (LOG.isDebugEnabled()) + LOG.debug("flushed {} {}", flushed, this); + } + catch (IOException e) + { + throw new EofException(e); + } + + if (flushed > 0) + notIdle(); + + for (ByteBuffer b : buffers) + { + if (!BufferUtil.isEmpty(b)) return false; } + return true; } + public DatagramChannel getChannel() + { + return _channel; + } + @Override public Object getTransport() { - return this.channel; + return _channel; } @Override - protected void onIdleExpired(TimeoutException timeout) + protected void needsFillInterest() { - // TODO: close the channel. - LOG.info("idle timeout", timeout); + changeInterests(SelectionKey.OP_READ); + } + + @Override + protected void onIncompleteFlush() + { + changeInterests(SelectionKey.OP_WRITE); + } + + @Override + public Runnable onSelected() + { + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). + + int readyOps = _key.readyOps(); + int oldInterestOps; + int newInterestOps; + try (AutoLock l = _lock.lock()) + { + _updatePending = true; + // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps & ~readyOps; + _desiredInterestOps = newInterestOps; + } + + boolean fillable = (readyOps & SelectionKey.OP_READ) != 0; + boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0; + + if (LOG.isDebugEnabled()) + LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this); + + // return task to complete the job + Runnable task = fillable + ? (flushable + ? _runCompleteWriteFillable + : _runFillable) + : (flushable + ? _runCompleteWrite + : null); + + if (LOG.isDebugEnabled()) + LOG.debug("task {}", task); + return task; + } + + private void updateKeyAction(Selector selector) + { + updateKey(); } @Override @@ -301,7 +384,7 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man @Override public void replaceKey(SelectionKey newKey) { - this._key = newKey; + _key = newKey; } private void changeInterests(int operation) @@ -329,143 +412,14 @@ public class ServerDatagramEndPoint extends IdleTimeout implements EndPoint, Man } @Override - public Runnable onSelected() + public String toEndPointString() { - // This method runs from the selector thread, - // possibly concurrently with changeInterests(int). - - int readyOps = _key.readyOps(); - int oldInterestOps; - int newInterestOps; - try (AutoLock l = _lock.lock()) - { - _updatePending = true; - // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). - oldInterestOps = _desiredInterestOps; - newInterestOps = oldInterestOps & ~readyOps; - _desiredInterestOps = newInterestOps; - } - - boolean fillable = (readyOps & SelectionKey.OP_READ) != 0; - boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0; - - if (LOG.isDebugEnabled()) - LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this); - - // return task to complete the job - Runnable task = fillable - ? (flushable - ? _runCompleteWriteFillable - : _runFillable) - : (flushable - ? _runCompleteWrite - : null); - - if (LOG.isDebugEnabled()) - LOG.debug("task {}", task); - return task; - } - - @Override - public void fillInterested(Callback callback) throws ReadPendingException - { - fillInterest.register(callback); - } - - @Override - public boolean tryFillInterested(Callback callback) - { - return fillInterest.tryRegister(callback); - } - - @Override - public boolean isFillInterested() - { - return fillInterest.isInterested(); - } - - @Override - public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException - { - writeFlusher.write(callback, buffers); - } - - @Override - public Connection getConnection() - { - return connection; - } - - @Override - public void setConnection(Connection connection) - { - this.connection = connection; - } - - @Override - public void onOpen() - { - super.onOpen(); - open = true; - if (LOG.isDebugEnabled()) - LOG.debug("onOpen {}", this); - } - - @Override - public void onClose() - { - super.onClose(); - onClose(null); - } - - @Override - public void onClose(Throwable cause) - { - open = false; - if (LOG.isDebugEnabled()) - LOG.debug("onClose {}", this); - } - - @Override - public void upgrade(Connection newConnection) - { - throw new UnsupportedOperationException(); - } - - private abstract class RunnableTask implements Runnable, Invocable - { - final String _operation; - - protected RunnableTask(String op) - { - _operation = op; - } - - @Override - public String toString() - { - return String.format("%s:%s:%s", ServerDatagramEndPoint.this, _operation, getInvocationType()); - } - } - - private abstract class RunnableCloseable extends ServerDatagramEndPoint.RunnableTask implements Closeable - { - protected RunnableCloseable(String op) - { - super(op); - } - - @Override - public void close() - { - try - { - ServerDatagramEndPoint.this.close(); - } - catch (Throwable x) - { - LOG.warn("Unable to close {}", ServerDatagramEndPoint.this, x); - } - } + // We do a best effort to print the right toString() and that's it. + return String.format("%s{io=%d/%d,kio=%d,kro=%d}", + super.toEndPointString(), + _currentInterestOps, + _desiredInterestOps, + ManagedSelector.safeInterestOps(_key), + ManagedSelector.safeReadyOps(_key)); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index eeebf06face..591b2fa6b28 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -403,7 +403,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable LOG.debug("Created {}", endPoint); } - void destroyEndPoint(EndPoint endPoint, Throwable cause) + public void destroyEndPoint(EndPoint endPoint, Throwable cause) { // Waking up the selector is necessary to clean the // cancelled-key set and tell the TCP stack that the @@ -420,7 +420,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - static int safeReadyOps(SelectionKey selectionKey) + public static int safeReadyOps(SelectionKey selectionKey) { try { @@ -433,7 +433,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - static int safeInterestOps(SelectionKey selectionKey) + public static int safeInterestOps(SelectionKey selectionKey) { try {