diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index cdc0c9c32be..7c6938bd0c8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -458,11 +458,11 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint name = c.getSimpleName(); } - return String.format("%s@%h{%s<->%s,%s,fill=%s,flush=%s,to=%d/%d}", + return String.format("%s@%h{l=%s,r=%s,%s,fill=%s,flush=%s,to=%d/%d}", name, this, - getRemoteAddress(), getLocalAddress(), + getRemoteAddress(), _state.get(), _fillInterest.toStateString(), _writeFlusher.toStateString(), diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index b3b78a81737..cd6b02cf892 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -20,12 +20,12 @@ package org.eclipse.jetty.io; import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; import java.nio.channels.CancelledKeyException; -import java.nio.channels.GatheringByteChannel; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; @@ -41,21 +41,14 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage { private static final Logger LOG = Log.getLogger(ChannelEndPoint.class); - private final ByteChannel _channel; - private final GatheringByteChannel _gather; - protected final ManagedSelector _selector; - protected final SelectionKey _key; + private final SocketChannel _channel; + private final ManagedSelector _selector; + private final SelectionKey _key; private boolean _updatePending; - - /** - * The current value for {@link SelectionKey#interestOps()}. - */ - protected int _currentInterestOps; - - /** - * The desired value for {@link SelectionKey#interestOps()}. - */ - protected int _desiredInterestOps; + // The current value for interestOps. + private int _currentInterestOps; + // The desired value for interestOps. + private int _desiredInterestOps; private abstract class RunnableTask implements Runnable, Invocable { @@ -94,14 +87,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } } - private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate() - { - @Override - public void update(Selector selector) - { - updateKey(); - } - }; + private final ManagedSelector.SelectorUpdate _updateKeyAction = selector -> updateKey(); private final Runnable _runFillable = new RunnableCloseable("runFillable") { @@ -166,13 +152,24 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - public ChannelEndPoint(ByteChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + public ChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { super(scheduler); _channel = channel; _selector = selector; _key = key; - _gather = (channel instanceof GatheringByteChannel) ? (GatheringByteChannel)channel : null; + } + + @Override + public InetSocketAddress getLocalAddress() + { + return (InetSocketAddress)_channel.socket().getLocalSocketAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() + { + return (InetSocketAddress)_channel.socket().getRemoteSocketAddress(); } @Override @@ -187,6 +184,21 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage return _channel.isOpen(); } + @Override + protected void doShutdownOutput() + { + try + { + Socket socket = _channel.socket(); + if (!socket.isOutputShutdown()) + socket.shutdownOutput(); + } + catch (IOException e) + { + LOG.debug(e); + } + } + @Override public void doClose() { @@ -254,27 +266,10 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage @Override public boolean flush(ByteBuffer... buffers) throws IOException { - long flushed = 0; + long flushed; try { - if (buffers.length == 1) - flushed = _channel.write(buffers[0]); - else if (_gather != null && buffers.length > 1) - flushed = _gather.write(buffers, 0, buffers.length); - else - { - for (ByteBuffer b : buffers) - { - if (b.hasRemaining()) - { - int l = _channel.write(b); - if (l > 0) - flushed += l; - if (b.hasRemaining()) - break; - } - } - } + flushed = _channel.write(buffers); if (LOG.isDebugEnabled()) LOG.debug("flushed {} {}", flushed, this); } @@ -295,7 +290,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage return true; } - public ByteChannel getChannel() + public SocketChannel getChannel() { return _channel; } @@ -321,9 +316,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage @Override public Runnable onSelected() { - /** - * This method may run concurrently with {@link #changeInterests(int)}. - */ + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). int readyOps = _key.readyOps(); int oldInterestOps; @@ -360,9 +354,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage @Override public void updateKey() { - /** - * This method may run concurrently with {@link #changeInterests(int)}. - */ + // This method runs from the selector thread, + // possibly concurrently with changeInterests(int). try { @@ -385,22 +378,21 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } catch (CancelledKeyException x) { - LOG.debug("Ignoring key update for concurrently closed channel {}", this); + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring key update for cancelled key {}", this, x); close(); } catch (Throwable x) { - LOG.warn("Ignoring key update for " + this, x); + LOG.warn("Ignoring key update for {}", this, x); close(); } } private void changeInterests(int operation) { - /** - * This method may run concurrently with - * {@link #updateKey()} and {@link #onSelected()}. - */ + // This method runs from any thread, possibly + // concurrently with updateKey() and onSelected(). int oldInterestOps; int newInterestOps; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 8cce379672d..e8889068251 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -79,7 +79,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } private final AtomicBoolean _started = new AtomicBoolean(false); - private boolean _selecting = false; + private boolean _selecting; private final SelectorManager _selectorManager; private final int _id; private final ExecutionStrategy _strategy; @@ -123,22 +123,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable start._started.await(); } - protected void onSelectFailed(Throwable cause) - { - // override to change behavior - } - - public int size() - { - Selector s = _selector; - if (s == null) - return 0; - Set keys = s.keys(); - if (keys == null) - return 0; - return keys.size(); - } - @Override protected void doStop() throws Exception { @@ -160,6 +144,22 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable super.doStop(); } + protected void onSelectFailed(Throwable cause) + { + // override to change behavior + } + + public int size() + { + Selector s = _selector; + if (s == null) + return 0; + Set keys = s.keys(); + if (keys == null) + return 0; + return keys.size(); + } + /** * Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()} * @@ -223,7 +223,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private void processConnect(SelectionKey key, final Connect connect) + private void processConnect(SelectionKey key, Connect connect) { SelectableChannel channel = key.channel(); try @@ -278,7 +278,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable LOG.debug("Created {}", endPoint); } - public void destroyEndPoint(final EndPoint endPoint) + void destroyEndPoint(EndPoint endPoint) { // Waking up the selector is necessary to clean the // cancelled-key set and tell the TCP stack that the @@ -513,7 +513,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable else { LOG.warn(x.toString()); - LOG.debug(x); + if (LOG.isDebugEnabled()) + LOG.debug(x); } } return false; @@ -524,9 +525,10 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable while (_cursor.hasNext()) { SelectionKey key = _cursor.next(); + Object attachment = key.attachment(); + SelectableChannel channel = key.channel(); if (key.isValid()) { - Object attachment = key.attachment(); if (LOG.isDebugEnabled()) LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment); try @@ -549,24 +551,21 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (CancelledKeyException x) { - LOG.debug("Ignoring cancelled key for channel {}", key.channel()); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring cancelled key for channel {}", channel); + IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel); } catch (Throwable x) { - LOG.warn("Could not process key for channel " + key.channel(), x); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); + LOG.warn("Could not process key for channel {}", channel, x); + IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel); } } else { if (LOG.isDebugEnabled()) - LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); - Object attachment = key.attachment(); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); + LOG.debug("Selector loop ignoring invalid key for channel {}", channel); + IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel); } } return null; @@ -615,7 +614,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private static class DumpKeys implements SelectorUpdate { - private CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch = new CountDownLatch(1); private List keys; @Override @@ -651,9 +650,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private final SelectableChannel _channel; private SelectionKey _key; - public Acceptor(SelectableChannel channel) + Acceptor(SelectableChannel channel) { - this._channel = channel; + _channel = channel; } @Override @@ -661,13 +660,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { try { - if (_key == null) - { - _key = _channel.register(selector, SelectionKey.OP_ACCEPT, this); - } - + _key = _channel.register(selector, SelectionKey.OP_ACCEPT, this); if (LOG.isDebugEnabled()) - LOG.debug("{} acceptor={}", this, _key); + LOG.debug("{} acceptor={}", this, _channel); } catch (Throwable x) { @@ -679,13 +674,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable @Override public Runnable onSelected() { - SelectableChannel server = _key.channel(); SelectableChannel channel = null; try { while (true) { - channel = _selectorManager.doAccept(server); + channel = _selectorManager.doAccept(_channel); if (channel == null) break; _selectorManager.accepted(channel); @@ -693,10 +687,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable x) { + LOG.warn("Accept failed for channel {}", channel, x); IO.close(channel); - LOG.warn("Accept failed for channel " + channel, x); } - return null; } @@ -709,8 +702,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable public void close() throws IOException { SelectionKey key = _key; - _key = null; - if (key != null && key.isValid()) + if (key != null) key.cancel(); } } @@ -731,7 +723,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable @Override public void close() { - LOG.debug("closed accept of {}", channel); + if (LOG.isDebugEnabled()) + LOG.debug("closed accept of {}", channel); IO.close(channel); } @@ -747,7 +740,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { IO.close(channel); _selectorManager.onAcceptFailed(channel, x); - LOG.debug(x); + if (LOG.isDebugEnabled()) + LOG.debug(x); } } @@ -761,7 +755,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } catch (Throwable x) { - LOG.debug(x); + if (LOG.isDebugEnabled()) + LOG.debug(x); failed(x); } } @@ -770,9 +765,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { IO.close(channel); LOG.warn(String.valueOf(failure)); - LOG.debug(failure); + if (LOG.isDebugEnabled()) + LOG.debug(failure); _selectorManager.onAcceptFailed(channel, failure); } + + @Override + public String toString() + { + return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), channel); + } } class Connect implements SelectorUpdate, Runnable @@ -832,16 +834,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private class CloseConnections implements SelectorUpdate { - final Set _closed; - final CountDownLatch _noEndPoints = new CountDownLatch(1); - final CountDownLatch _complete = new CountDownLatch(1); + private final Set _closed; + private final CountDownLatch _complete = new CountDownLatch(1); - public CloseConnections() + private CloseConnections() { this(null); } - public CloseConnections(Set closed) + private CloseConnections(Set closed) { _closed = closed; } @@ -851,7 +852,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { if (LOG.isDebugEnabled()) LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this); - boolean zero = true; for (SelectionKey key : selector.keys()) { if (key != null && key.isValid()) @@ -860,14 +860,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable Object attachment = key.attachment(); if (attachment instanceof EndPoint) { - EndPoint endp = (EndPoint)attachment; - if (!endp.isOutputShutdown()) - zero = false; - Connection connection = endp.getConnection(); + EndPoint endPoint = (EndPoint)attachment; + Connection connection = endPoint.getConnection(); if (connection != null) closeable = connection; else - closeable = endp; + closeable = endPoint; } if (closeable != null) @@ -884,30 +882,26 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } } - - if (zero) - _noEndPoints.countDown(); _complete.countDown(); } } private class StopSelector implements SelectorUpdate { - CountDownLatch _stopped = new CountDownLatch(1); + private final CountDownLatch _stopped = new CountDownLatch(1); @Override public void update(Selector selector) { for (SelectionKey key : selector.keys()) { - if (key != null && key.isValid()) - { - Object attachment = key.attachment(); - if (attachment instanceof EndPoint) - IO.close((EndPoint)attachment); - } + // Key may be null when using the UnixSocket selector. + if (key == null) + continue; + Object attachment = key.attachment(); + if (attachment instanceof Closeable) + IO.close((Closeable)attachment); } - _selector = null; IO.close(selector); _stopped.countDown(); @@ -936,7 +930,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { IO.close(_connect.channel); LOG.warn(String.valueOf(failure)); - LOG.debug(failure); + if (LOG.isDebugEnabled()) + LOG.debug(failure); _connect.failed(failure); } } @@ -944,7 +939,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable @Override public String toString() { - return String.format("CreateEndPoint@%x{%s,%s}", hashCode(), _connect, _key); + return String.format("CreateEndPoint@%x{%s}", hashCode(), _connect); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 038dbecaef3..4ef5240ca9b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -214,7 +214,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump */ public void accept(SelectableChannel channel, Object attachment) { - final ManagedSelector selector = chooseSelector(); + ManagedSelector selector = chooseSelector(); selector.submit(selector.new Accept(channel, attachment)); } @@ -229,7 +229,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump */ public Closeable acceptor(SelectableChannel server) { - final ManagedSelector selector = chooseSelector(); + ManagedSelector selector = chooseSelector(); ManagedSelector.Acceptor acceptor = selector.new Acceptor(server); selector.submit(acceptor); return acceptor; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java index 51d2a562aee..3fe5467d144 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java @@ -18,24 +18,15 @@ package org.eclipse.jetty.io; -import java.io.IOException; -import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; public class SocketChannelEndPoint extends ChannelEndPoint { - private static final Logger LOG = Log.getLogger(SocketChannelEndPoint.class); - private final Socket _socket; - private final InetSocketAddress _local; - private final InetSocketAddress _remote; - public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { this((SocketChannel)channel, selector, key, scheduler); @@ -44,40 +35,10 @@ public class SocketChannelEndPoint extends ChannelEndPoint public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { super(channel, selector, key, scheduler); - - _socket = channel.socket(); - _local = (InetSocketAddress)_socket.getLocalSocketAddress(); - _remote = (InetSocketAddress)_socket.getRemoteSocketAddress(); } public Socket getSocket() { - return _socket; - } - - @Override - public InetSocketAddress getLocalAddress() - { - return _local; - } - - @Override - public InetSocketAddress getRemoteAddress() - { - return _remote; - } - - @Override - protected void doShutdownOutput() - { - try - { - if (!_socket.isOutputShutdown()) - _socket.shutdownOutput(); - } - catch (IOException e) - { - LOG.debug(e); - } + return getChannel().socket(); } } diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java index fb8cdacaea1..c6566a51ad0 100644 --- a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java +++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java @@ -33,12 +33,15 @@ public class UnixSocketEndPoint extends ChannelEndPoint { private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class); - private final UnixSocketChannel _channel; - public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { super(channel, selector, key, scheduler); - _channel = channel; + } + + @Override + public UnixSocketChannel getChannel() + { + return (UnixSocketChannel)super.getChannel(); } @Override @@ -56,11 +59,9 @@ public class UnixSocketEndPoint extends ChannelEndPoint @Override protected void doShutdownOutput() { - if (LOG.isDebugEnabled()) - LOG.debug("oshut {}", this); try { - _channel.shutdownOutput(); + getChannel().shutdownOutput(); super.doShutdownOutput(); } catch (IOException e) diff --git a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java index 9db72df33ce..ca025a9893a 100644 --- a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java +++ b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketTest.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledOnOs; @@ -145,6 +146,7 @@ public class UnixSocketTest assertThat(contentResponse.getContentAsString(), containsString("Hello World")); } + @Tag("external") @Test public void testNotLocal() throws Exception { diff --git a/tests/test-integration/src/test/resources/jetty-logging.properties b/tests/test-integration/src/test/resources/jetty-logging.properties index f076e496439..8f0c83cbb6a 100644 --- a/tests/test-integration/src/test/resources/jetty-logging.properties +++ b/tests/test-integration/src/test/resources/jetty-logging.properties @@ -1,5 +1,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog #org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog -org.eclipse.jetty.LEVEL=WARN #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.websocket.LEVEL=DEBUG