Code cleanups.

This commit is contained in:
Simone Bordet 2020-04-27 22:57:57 +02:00
parent 32aa5d6453
commit 46ad90598c
8 changed files with 134 additions and 184 deletions

View File

@ -458,11 +458,11 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
name = c.getSimpleName();
}
return String.format("%s@%h{%s<->%s,%s,fill=%s,flush=%s,to=%d/%d}",
return String.format("%s@%h{l=%s,r=%s,%s,fill=%s,flush=%s,to=%d/%d}",
name,
this,
getRemoteAddress(),
getLocalAddress(),
getRemoteAddress(),
_state.get(),
_fillInterest.toStateString(),
_writeFlusher.toStateString(),

View File

@ -20,12 +20,12 @@ package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -41,21 +41,14 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
{
private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
private final ByteChannel _channel;
private final GatheringByteChannel _gather;
protected final ManagedSelector _selector;
protected final SelectionKey _key;
private final SocketChannel _channel;
private final ManagedSelector _selector;
private final SelectionKey _key;
private boolean _updatePending;
/**
* The current value for {@link SelectionKey#interestOps()}.
*/
protected int _currentInterestOps;
/**
* The desired value for {@link SelectionKey#interestOps()}.
*/
protected int _desiredInterestOps;
// The current value for interestOps.
private int _currentInterestOps;
// The desired value for interestOps.
private int _desiredInterestOps;
private abstract class RunnableTask implements Runnable, Invocable
{
@ -94,14 +87,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
}
private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate()
{
@Override
public void update(Selector selector)
{
updateKey();
}
};
private final ManagedSelector.SelectorUpdate _updateKeyAction = selector -> updateKey();
private final Runnable _runFillable = new RunnableCloseable("runFillable")
{
@ -166,13 +152,24 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
};
public ChannelEndPoint(ByteChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
public ChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
_channel = channel;
_selector = selector;
_key = key;
_gather = (channel instanceof GatheringByteChannel) ? (GatheringByteChannel)channel : null;
}
@Override
public InetSocketAddress getLocalAddress()
{
return (InetSocketAddress)_channel.socket().getLocalSocketAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return (InetSocketAddress)_channel.socket().getRemoteSocketAddress();
}
@Override
@ -187,6 +184,21 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
return _channel.isOpen();
}
@Override
protected void doShutdownOutput()
{
try
{
Socket socket = _channel.socket();
if (!socket.isOutputShutdown())
socket.shutdownOutput();
}
catch (IOException e)
{
LOG.debug(e);
}
}
@Override
public void doClose()
{
@ -254,27 +266,10 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
long flushed = 0;
long flushed;
try
{
if (buffers.length == 1)
flushed = _channel.write(buffers[0]);
else if (_gather != null && buffers.length > 1)
flushed = _gather.write(buffers, 0, buffers.length);
else
{
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l = _channel.write(b);
if (l > 0)
flushed += l;
if (b.hasRemaining())
break;
}
}
}
flushed = _channel.write(buffers);
if (LOG.isDebugEnabled())
LOG.debug("flushed {} {}", flushed, this);
}
@ -295,7 +290,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
return true;
}
public ByteChannel getChannel()
public SocketChannel getChannel()
{
return _channel;
}
@ -321,9 +316,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
@Override
public Runnable onSelected()
{
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
@ -360,9 +354,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
@Override
public void updateKey()
{
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
try
{
@ -385,22 +378,21 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
if (LOG.isDebugEnabled())
LOG.debug("Ignoring key update for cancelled key {}", this, x);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for " + this, x);
LOG.warn("Ignoring key update for {}", this, x);
close();
}
}
private void changeInterests(int operation)
{
/**
* This method may run concurrently with
* {@link #updateKey()} and {@link #onSelected()}.
*/
// This method runs from any thread, possibly
// concurrently with updateKey() and onSelected().
int oldInterestOps;
int newInterestOps;

View File

@ -79,7 +79,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
private final AtomicBoolean _started = new AtomicBoolean(false);
private boolean _selecting = false;
private boolean _selecting;
private final SelectorManager _selectorManager;
private final int _id;
private final ExecutionStrategy _strategy;
@ -123,22 +123,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
start._started.await();
}
protected void onSelectFailed(Throwable cause)
{
// override to change behavior
}
public int size()
{
Selector s = _selector;
if (s == null)
return 0;
Set<SelectionKey> keys = s.keys();
if (keys == null)
return 0;
return keys.size();
}
@Override
protected void doStop() throws Exception
{
@ -160,6 +144,22 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
super.doStop();
}
protected void onSelectFailed(Throwable cause)
{
// override to change behavior
}
public int size()
{
Selector s = _selector;
if (s == null)
return 0;
Set<SelectionKey> keys = s.keys();
if (keys == null)
return 0;
return keys.size();
}
/**
* Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()}
*
@ -223,7 +223,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
}
private void processConnect(SelectionKey key, final Connect connect)
private void processConnect(SelectionKey key, Connect connect)
{
SelectableChannel channel = key.channel();
try
@ -278,7 +278,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Created {}", endPoint);
}
public void destroyEndPoint(final EndPoint endPoint)
void destroyEndPoint(EndPoint endPoint)
{
// Waking up the selector is necessary to clean the
// cancelled-key set and tell the TCP stack that the
@ -513,7 +513,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
else
{
LOG.warn(x.toString());
LOG.debug(x);
if (LOG.isDebugEnabled())
LOG.debug(x);
}
}
return false;
@ -524,9 +525,10 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
while (_cursor.hasNext())
{
SelectionKey key = _cursor.next();
Object attachment = key.attachment();
SelectableChannel channel = key.channel();
if (key.isValid())
{
Object attachment = key.attachment();
if (LOG.isDebugEnabled())
LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment);
try
@ -549,24 +551,21 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
IO.close((EndPoint)attachment);
if (LOG.isDebugEnabled())
LOG.debug("Ignoring cancelled key for channel {}", channel);
IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
IO.close((EndPoint)attachment);
LOG.warn("Could not process key for channel {}", channel, x);
IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
IO.close((EndPoint)attachment);
LOG.debug("Selector loop ignoring invalid key for channel {}", channel);
IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
}
}
return null;
@ -615,7 +614,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private static class DumpKeys implements SelectorUpdate
{
private CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch = new CountDownLatch(1);
private List<String> keys;
@Override
@ -651,9 +650,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private final SelectableChannel _channel;
private SelectionKey _key;
public Acceptor(SelectableChannel channel)
Acceptor(SelectableChannel channel)
{
this._channel = channel;
_channel = channel;
}
@Override
@ -661,13 +660,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
try
{
if (_key == null)
{
_key = _channel.register(selector, SelectionKey.OP_ACCEPT, this);
}
_key = _channel.register(selector, SelectionKey.OP_ACCEPT, this);
if (LOG.isDebugEnabled())
LOG.debug("{} acceptor={}", this, _key);
LOG.debug("{} acceptor={}", this, _channel);
}
catch (Throwable x)
{
@ -679,13 +674,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public Runnable onSelected()
{
SelectableChannel server = _key.channel();
SelectableChannel channel = null;
try
{
while (true)
{
channel = _selectorManager.doAccept(server);
channel = _selectorManager.doAccept(_channel);
if (channel == null)
break;
_selectorManager.accepted(channel);
@ -693,10 +687,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
catch (Throwable x)
{
LOG.warn("Accept failed for channel {}", channel, x);
IO.close(channel);
LOG.warn("Accept failed for channel " + channel, x);
}
return null;
}
@ -709,8 +702,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
public void close() throws IOException
{
SelectionKey key = _key;
_key = null;
if (key != null && key.isValid())
if (key != null)
key.cancel();
}
}
@ -731,7 +723,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public void close()
{
LOG.debug("closed accept of {}", channel);
if (LOG.isDebugEnabled())
LOG.debug("closed accept of {}", channel);
IO.close(channel);
}
@ -747,7 +740,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
IO.close(channel);
_selectorManager.onAcceptFailed(channel, x);
LOG.debug(x);
if (LOG.isDebugEnabled())
LOG.debug(x);
}
}
@ -761,7 +755,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
catch (Throwable x)
{
LOG.debug(x);
if (LOG.isDebugEnabled())
LOG.debug(x);
failed(x);
}
}
@ -770,9 +765,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
IO.close(channel);
LOG.warn(String.valueOf(failure));
LOG.debug(failure);
if (LOG.isDebugEnabled())
LOG.debug(failure);
_selectorManager.onAcceptFailed(channel, failure);
}
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), channel);
}
}
class Connect implements SelectorUpdate, Runnable
@ -832,16 +834,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private class CloseConnections implements SelectorUpdate
{
final Set<Closeable> _closed;
final CountDownLatch _noEndPoints = new CountDownLatch(1);
final CountDownLatch _complete = new CountDownLatch(1);
private final Set<Closeable> _closed;
private final CountDownLatch _complete = new CountDownLatch(1);
public CloseConnections()
private CloseConnections()
{
this(null);
}
public CloseConnections(Set<Closeable> closed)
private CloseConnections(Set<Closeable> closed)
{
_closed = closed;
}
@ -851,7 +852,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
if (LOG.isDebugEnabled())
LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this);
boolean zero = true;
for (SelectionKey key : selector.keys())
{
if (key != null && key.isValid())
@ -860,14 +860,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
{
EndPoint endp = (EndPoint)attachment;
if (!endp.isOutputShutdown())
zero = false;
Connection connection = endp.getConnection();
EndPoint endPoint = (EndPoint)attachment;
Connection connection = endPoint.getConnection();
if (connection != null)
closeable = connection;
else
closeable = endp;
closeable = endPoint;
}
if (closeable != null)
@ -884,30 +882,26 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
}
}
if (zero)
_noEndPoints.countDown();
_complete.countDown();
}
}
private class StopSelector implements SelectorUpdate
{
CountDownLatch _stopped = new CountDownLatch(1);
private final CountDownLatch _stopped = new CountDownLatch(1);
@Override
public void update(Selector selector)
{
for (SelectionKey key : selector.keys())
{
if (key != null && key.isValid())
{
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
IO.close((EndPoint)attachment);
}
// Key may be null when using the UnixSocket selector.
if (key == null)
continue;
Object attachment = key.attachment();
if (attachment instanceof Closeable)
IO.close((Closeable)attachment);
}
_selector = null;
IO.close(selector);
_stopped.countDown();
@ -936,7 +930,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
IO.close(_connect.channel);
LOG.warn(String.valueOf(failure));
LOG.debug(failure);
if (LOG.isDebugEnabled())
LOG.debug(failure);
_connect.failed(failure);
}
}
@ -944,7 +939,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public String toString()
{
return String.format("CreateEndPoint@%x{%s,%s}", hashCode(), _connect, _key);
return String.format("CreateEndPoint@%x{%s}", hashCode(), _connect);
}
}

View File

@ -214,7 +214,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
public void accept(SelectableChannel channel, Object attachment)
{
final ManagedSelector selector = chooseSelector();
ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel, attachment));
}
@ -229,7 +229,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
public Closeable acceptor(SelectableChannel server)
{
final ManagedSelector selector = chooseSelector();
ManagedSelector selector = chooseSelector();
ManagedSelector.Acceptor acceptor = selector.new Acceptor(server);
selector.submit(acceptor);
return acceptor;

View File

@ -18,24 +18,15 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class SocketChannelEndPoint extends ChannelEndPoint
{
private static final Logger LOG = Log.getLogger(SocketChannelEndPoint.class);
private final Socket _socket;
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
this((SocketChannel)channel, selector, key, scheduler);
@ -44,40 +35,10 @@ public class SocketChannelEndPoint extends ChannelEndPoint
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel, selector, key, scheduler);
_socket = channel.socket();
_local = (InetSocketAddress)_socket.getLocalSocketAddress();
_remote = (InetSocketAddress)_socket.getRemoteSocketAddress();
}
public Socket getSocket()
{
return _socket;
}
@Override
public InetSocketAddress getLocalAddress()
{
return _local;
}
@Override
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
@Override
protected void doShutdownOutput()
{
try
{
if (!_socket.isOutputShutdown())
_socket.shutdownOutput();
}
catch (IOException e)
{
LOG.debug(e);
}
return getChannel().socket();
}
}

View File

@ -33,12 +33,15 @@ public class UnixSocketEndPoint extends ChannelEndPoint
{
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
private final UnixSocketChannel _channel;
public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel, selector, key, scheduler);
_channel = channel;
}
@Override
public UnixSocketChannel getChannel()
{
return (UnixSocketChannel)super.getChannel();
}
@Override
@ -56,11 +59,9 @@ public class UnixSocketEndPoint extends ChannelEndPoint
@Override
protected void doShutdownOutput()
{
if (LOG.isDebugEnabled())
LOG.debug("oshut {}", this);
try
{
_channel.shutdownOutput();
getChannel().shutdownOutput();
super.doShutdownOutput();
}
catch (IOException e)

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
@ -145,6 +146,7 @@ public class UnixSocketTest
assertThat(contentResponse.getContentAsString(), containsString("Hello World"));
}
@Tag("external")
@Test
public void testNotLocal() throws Exception
{

View File

@ -1,5 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog
org.eclipse.jetty.LEVEL=WARN
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.websocket.LEVEL=DEBUG