Merge pull request #4823 from eclipse/jetty-9.4.x-4798-recover_selector_failures

Issue #4798 - Recover from Selector Failures
This commit is contained in:
Simone Bordet 2020-05-04 17:43:28 +02:00 committed by GitHub
commit 0ab2b42055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 656 additions and 213 deletions

View File

@ -458,11 +458,11 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
name = c.getSimpleName(); name = c.getSimpleName();
} }
return String.format("%s@%h{%s<->%s,%s,fill=%s,flush=%s,to=%d/%d}", return String.format("%s@%h{l=%s,r=%s,%s,fill=%s,flush=%s,to=%d/%d}",
name, name,
this, this,
getRemoteAddress(),
getLocalAddress(), getLocalAddress(),
getRemoteAddress(),
_state.get(), _state.get(),
_fillInterest.toStateString(), _fillInterest.toStateString(),
_writeFlusher.toStateString(), _writeFlusher.toStateString(),

View File

@ -20,12 +20,13 @@ package org.eclipse.jetty.io;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -41,21 +42,14 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
{ {
private static final Logger LOG = Log.getLogger(ChannelEndPoint.class); private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
private final ByteChannel _channel; private final SocketChannel _channel;
private final GatheringByteChannel _gather; private final ManagedSelector _selector;
protected final ManagedSelector _selector; private SelectionKey _key;
protected final SelectionKey _key;
private boolean _updatePending; private boolean _updatePending;
// The current value for interestOps.
/** private int _currentInterestOps;
* The current value for {@link SelectionKey#interestOps()}. // The desired value for interestOps.
*/ private int _desiredInterestOps;
protected int _currentInterestOps;
/**
* The desired value for {@link SelectionKey#interestOps()}.
*/
protected int _desiredInterestOps;
private abstract class RunnableTask implements Runnable, Invocable private abstract class RunnableTask implements Runnable, Invocable
{ {
@ -94,14 +88,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
} }
private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate() private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
{
@Override
public void update(Selector selector)
{
updateKey();
}
};
private final Runnable _runFillable = new RunnableCloseable("runFillable") private final Runnable _runFillable = new RunnableCloseable("runFillable")
{ {
@ -166,13 +153,24 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
}; };
public ChannelEndPoint(ByteChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) public ChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{ {
super(scheduler); super(scheduler);
_channel = channel; _channel = channel;
_selector = selector; _selector = selector;
_key = key; _key = key;
_gather = (channel instanceof GatheringByteChannel) ? (GatheringByteChannel)channel : null; }
@Override
public InetSocketAddress getLocalAddress()
{
return (InetSocketAddress)_channel.socket().getLocalSocketAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return (InetSocketAddress)_channel.socket().getRemoteSocketAddress();
} }
@Override @Override
@ -187,6 +185,21 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
return _channel.isOpen(); return _channel.isOpen();
} }
@Override
protected void doShutdownOutput()
{
try
{
Socket socket = _channel.socket();
if (!socket.isOutputShutdown())
socket.shutdownOutput();
}
catch (IOException e)
{
LOG.debug(e);
}
}
@Override @Override
public void doClose() public void doClose()
{ {
@ -254,27 +267,10 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
@Override @Override
public boolean flush(ByteBuffer... buffers) throws IOException public boolean flush(ByteBuffer... buffers) throws IOException
{ {
long flushed = 0; long flushed;
try try
{ {
if (buffers.length == 1) flushed = _channel.write(buffers);
flushed = _channel.write(buffers[0]);
else if (_gather != null && buffers.length > 1)
flushed = _gather.write(buffers, 0, buffers.length);
else
{
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l = _channel.write(b);
if (l > 0)
flushed += l;
if (b.hasRemaining())
break;
}
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("flushed {} {}", flushed, this); LOG.debug("flushed {} {}", flushed, this);
} }
@ -295,7 +291,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
return true; return true;
} }
public ByteChannel getChannel() public SocketChannel getChannel()
{ {
return _channel; return _channel;
} }
@ -321,9 +317,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
@Override @Override
public Runnable onSelected() public Runnable onSelected()
{ {
/** // This method runs from the selector thread,
* This method may run concurrently with {@link #changeInterests(int)}. // possibly concurrently with changeInterests(int).
*/
int readyOps = _key.readyOps(); int readyOps = _key.readyOps();
int oldInterestOps; int oldInterestOps;
@ -357,12 +352,16 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
return task; return task;
} }
private void updateKeyAction(Selector selector)
{
updateKey();
}
@Override @Override
public void updateKey() public void updateKey()
{ {
/** // This method runs from the selector thread,
* This method may run concurrently with {@link #changeInterests(int)}. // possibly concurrently with changeInterests(int).
*/
try try
{ {
@ -385,22 +384,27 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
catch (CancelledKeyException x) catch (CancelledKeyException x)
{ {
LOG.debug("Ignoring key update for concurrently closed channel {}", this); if (LOG.isDebugEnabled())
LOG.debug("Ignoring key update for cancelled key {}", this, x);
close(); close();
} }
catch (Throwable x) catch (Throwable x)
{ {
LOG.warn("Ignoring key update for " + this, x); LOG.warn("Ignoring key update for {}", this, x);
close(); close();
} }
} }
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
private void changeInterests(int operation) private void changeInterests(int operation)
{ {
/** // This method runs from any thread, possibly
* This method may run concurrently with // concurrently with updateKey() and onSelected().
* {@link #updateKey()} and {@link #onSelected()}.
*/
int oldInterestOps; int oldInterestOps;
int newInterestOps; int newInterestOps;

View File

@ -79,7 +79,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
private final AtomicBoolean _started = new AtomicBoolean(false); private final AtomicBoolean _started = new AtomicBoolean(false);
private boolean _selecting = false; private boolean _selecting;
private final SelectorManager _selectorManager; private final SelectorManager _selectorManager;
private final int _id; private final int _id;
private final ExecutionStrategy _strategy; private final ExecutionStrategy _strategy;
@ -123,22 +123,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
start._started.await(); start._started.await();
} }
protected void onSelectFailed(Throwable cause)
{
// override to change behavior
}
public int size()
{
Selector s = _selector;
if (s == null)
return 0;
Set<SelectionKey> keys = s.keys();
if (keys == null)
return 0;
return keys.size();
}
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
@ -160,22 +144,119 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
super.doStop(); super.doStop();
} }
protected int nioSelect(Selector selector, boolean now) throws IOException
{
return now ? selector.selectNow() : selector.select();
}
protected int select(Selector selector) throws IOException
{
try
{
int selected = nioSelect(selector, false);
if (selected == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken with none selected", selector);
if (Thread.interrupted() && !isRunning())
throw new ClosedSelectorException();
if (FORCE_SELECT_NOW)
selected = nioSelect(selector, true);
}
return selected;
}
catch (ClosedSelectorException x)
{
throw x;
}
catch (Throwable x)
{
handleSelectFailure(selector, x);
return 0;
}
}
protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException
{
LOG.info("Caught select() failure, trying to recover: {}", failure.toString());
if (LOG.isDebugEnabled())
LOG.debug(failure);
Selector newSelector = _selectorManager.newSelector();
for (SelectionKey oldKey : selector.keys())
{
SelectableChannel channel = oldKey.channel();
int interestOps = safeInterestOps(oldKey);
if (interestOps >= 0)
{
try
{
Object attachment = oldKey.attachment();
SelectionKey newKey = channel.register(newSelector, interestOps, attachment);
if (attachment instanceof Selectable)
((Selectable)attachment).replaceKey(newKey);
oldKey.cancel();
if (LOG.isDebugEnabled())
LOG.debug("Transferred {} iOps={} att={}", channel, interestOps, attachment);
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not transfer {}", channel, t);
IO.close(channel);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Invalid interestOps for {}", channel);
IO.close(channel);
}
}
IO.close(selector);
_selector = newSelector;
}
protected void onSelectFailed(Throwable cause)
{
// override to change behavior
}
public int size()
{
Selector s = _selector;
if (s == null)
return 0;
Set<SelectionKey> keys = s.keys();
if (keys == null)
return 0;
return keys.size();
}
/** /**
* Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()} * Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()}
* *
* @param update The selector update to apply at next wakeup * @param update The selector update to apply at next wakeup
*/ */
public void submit(SelectorUpdate update) public void submit(SelectorUpdate update)
{
submit(update, false);
}
private void submit(SelectorUpdate update, boolean lazy)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", update, this); LOG.debug("Queued change lazy={} {} on {}", lazy, update, this);
Selector selector = null; Selector selector = null;
synchronized (ManagedSelector.this) synchronized (ManagedSelector.this)
{ {
_updates.offer(update); _updates.offer(update);
if (_selecting) if (_selecting && !lazy)
{ {
selector = _selector; selector = _selector;
// To avoid the extra select wakeup. // To avoid the extra select wakeup.
@ -223,7 +304,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private void processConnect(SelectionKey key, final Connect connect) private void processConnect(SelectionKey key, Connect connect)
{ {
SelectableChannel channel = key.channel(); SelectableChannel channel = key.channel();
try try
@ -270,7 +351,18 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment()); Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(connection); endPoint.setConnection(connection);
selectionKey.attach(endPoint); submit(selector ->
{
SelectionKey key = selectionKey;
if (key.selector() != selector)
{
key = channel.keyFor(selector);
if (key != null && endPoint instanceof Selectable)
((Selectable)endPoint).replaceKey(key);
}
if (key != null)
key.attach(endPoint);
}, true);
endPoint.onOpen(); endPoint.onOpen();
endPointOpened(endPoint); endPointOpened(endPoint);
_selectorManager.connectionOpened(connection); _selectorManager.connectionOpened(connection);
@ -278,7 +370,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Created {}", endPoint); LOG.debug("Created {}", endPoint);
} }
public void destroyEndPoint(final EndPoint endPoint) void destroyEndPoint(EndPoint endPoint)
{ {
// Waking up the selector is necessary to clean the // Waking up the selector is necessary to clean the
// cancelled-key set and tell the TCP stack that the // cancelled-key set and tell the TCP stack that the
@ -386,6 +478,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
* {@link ManagedSelector} for this endpoint have been processed. * {@link ManagedSelector} for this endpoint have been processed.
*/ */
void updateKey(); void updateKey();
/**
* Callback method invoked when the SelectionKey is replaced
* because the channel has been moved to a new selector.
*
* @param newKey the new SelectionKey
*/
void replaceKey(SelectionKey newKey);
} }
private class SelectorProducer implements ExecutionStrategy.Producer private class SelectorProducer implements ExecutionStrategy.Producer
@ -465,22 +565,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
try try
{ {
Selector selector = _selector; Selector selector = _selector;
if (selector != null && selector.isOpen()) if (selector != null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size()); LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
int selected = selector.select(); int selected = ManagedSelector.this.select(selector);
if (selected == 0) // The selector may have been recreated.
selector = _selector;
if (selector != null)
{ {
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken with none selected", selector);
if (Thread.interrupted() && !isRunning())
throw new ClosedSelectorException();
if (FORCE_SELECT_NOW)
selected = selector.selectNow();
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size()); LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());
@ -500,6 +593,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
return true; return true;
} }
} }
}
catch (Throwable x) catch (Throwable x)
{ {
IO.close(_selector); IO.close(_selector);
@ -513,6 +607,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
else else
{ {
LOG.warn(x.toString()); LOG.warn(x.toString());
if (LOG.isDebugEnabled())
LOG.debug(x); LOG.debug(x);
} }
} }
@ -524,9 +619,10 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
while (_cursor.hasNext()) while (_cursor.hasNext())
{ {
SelectionKey key = _cursor.next(); SelectionKey key = _cursor.next();
Object attachment = key.attachment();
SelectableChannel channel = key.channel();
if (key.isValid()) if (key.isValid())
{ {
Object attachment = key.attachment();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment); LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment);
try try
@ -549,24 +645,21 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (CancelledKeyException x) catch (CancelledKeyException x)
{ {
LOG.debug("Ignoring cancelled key for channel {}", key.channel()); if (LOG.isDebugEnabled())
if (attachment instanceof EndPoint) LOG.debug("Ignoring cancelled key for channel {}", channel);
IO.close((EndPoint)attachment); IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
} }
catch (Throwable x) catch (Throwable x)
{ {
LOG.warn("Could not process key for channel " + key.channel(), x); LOG.warn("Could not process key for channel {}", channel, x);
if (attachment instanceof EndPoint) IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
IO.close((EndPoint)attachment);
} }
} }
else else
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); LOG.debug("Selector loop ignoring invalid key for channel {}", channel);
Object attachment = key.attachment(); IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
if (attachment instanceof EndPoint)
IO.close((EndPoint)attachment);
} }
} }
return null; return null;
@ -615,7 +708,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private static class DumpKeys implements SelectorUpdate private static class DumpKeys implements SelectorUpdate
{ {
private CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
private List<String> keys; private List<String> keys;
@Override @Override
@ -651,23 +744,19 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private final SelectableChannel _channel; private final SelectableChannel _channel;
private SelectionKey _key; private SelectionKey _key;
public Acceptor(SelectableChannel channel) Acceptor(SelectableChannel channel)
{ {
this._channel = channel; _channel = channel;
} }
@Override @Override
public void update(Selector selector) public void update(Selector selector)
{ {
try try
{
if (_key == null)
{ {
_key = _channel.register(selector, SelectionKey.OP_ACCEPT, this); _key = _channel.register(selector, SelectionKey.OP_ACCEPT, this);
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} acceptor={}", this, _key); LOG.debug("{} acceptor={}", this, _channel);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -679,13 +768,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override @Override
public Runnable onSelected() public Runnable onSelected()
{ {
SelectableChannel server = _key.channel();
SelectableChannel channel = null; SelectableChannel channel = null;
try try
{ {
while (true) while (true)
{ {
channel = _selectorManager.doAccept(server); channel = _selectorManager.doAccept(_channel);
if (channel == null) if (channel == null)
break; break;
_selectorManager.accepted(channel); _selectorManager.accepted(channel);
@ -693,10 +781,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable x) catch (Throwable x)
{ {
LOG.warn("Accept failed for channel {}", channel, x);
IO.close(channel); IO.close(channel);
LOG.warn("Accept failed for channel " + channel, x);
} }
return null; return null;
} }
@ -705,13 +792,18 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
} }
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
SelectionKey key = _key; // May be called from any thread.
_key = null; // Implements AbstractConnector.setAccepting(boolean).
if (key != null && key.isValid()) submit(selector -> _key.cancel());
key.cancel();
} }
} }
@ -731,6 +823,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override @Override
public void close() public void close()
{ {
if (LOG.isDebugEnabled())
LOG.debug("closed accept of {}", channel); LOG.debug("closed accept of {}", channel);
IO.close(channel); IO.close(channel);
} }
@ -747,6 +840,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
IO.close(channel); IO.close(channel);
_selectorManager.onAcceptFailed(channel, x); _selectorManager.onAcceptFailed(channel, x);
if (LOG.isDebugEnabled())
LOG.debug(x); LOG.debug(x);
} }
} }
@ -761,6 +855,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled())
LOG.debug(x); LOG.debug(x);
failed(x); failed(x);
} }
@ -770,9 +865,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
IO.close(channel); IO.close(channel);
LOG.warn(String.valueOf(failure)); LOG.warn(String.valueOf(failure));
if (LOG.isDebugEnabled())
LOG.debug(failure); LOG.debug(failure);
_selectorManager.onAcceptFailed(channel, failure); _selectorManager.onAcceptFailed(channel, failure);
} }
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), channel);
}
} }
class Connect implements SelectorUpdate, Runnable class Connect implements SelectorUpdate, Runnable
@ -832,16 +934,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private class CloseConnections implements SelectorUpdate private class CloseConnections implements SelectorUpdate
{ {
final Set<Closeable> _closed; private final Set<Closeable> _closed;
final CountDownLatch _noEndPoints = new CountDownLatch(1); private final CountDownLatch _complete = new CountDownLatch(1);
final CountDownLatch _complete = new CountDownLatch(1);
public CloseConnections() private CloseConnections()
{ {
this(null); this(null);
} }
public CloseConnections(Set<Closeable> closed) private CloseConnections(Set<Closeable> closed)
{ {
_closed = closed; _closed = closed;
} }
@ -851,7 +952,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this); LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this);
boolean zero = true;
for (SelectionKey key : selector.keys()) for (SelectionKey key : selector.keys())
{ {
if (key != null && key.isValid()) if (key != null && key.isValid())
@ -860,14 +960,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
Object attachment = key.attachment(); Object attachment = key.attachment();
if (attachment instanceof EndPoint) if (attachment instanceof EndPoint)
{ {
EndPoint endp = (EndPoint)attachment; EndPoint endPoint = (EndPoint)attachment;
if (!endp.isOutputShutdown()) Connection connection = endPoint.getConnection();
zero = false;
Connection connection = endp.getConnection();
if (connection != null) if (connection != null)
closeable = connection; closeable = connection;
else else
closeable = endp; closeable = endPoint;
} }
if (closeable != null) if (closeable != null)
@ -884,30 +982,26 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
} }
if (zero)
_noEndPoints.countDown();
_complete.countDown(); _complete.countDown();
} }
} }
private class StopSelector implements SelectorUpdate private class StopSelector implements SelectorUpdate
{ {
CountDownLatch _stopped = new CountDownLatch(1); private final CountDownLatch _stopped = new CountDownLatch(1);
@Override @Override
public void update(Selector selector) public void update(Selector selector)
{ {
for (SelectionKey key : selector.keys()) for (SelectionKey key : selector.keys())
{ {
if (key != null && key.isValid()) // Key may be null when using the UnixSocket selector.
{ if (key == null)
continue;
Object attachment = key.attachment(); Object attachment = key.attachment();
if (attachment instanceof EndPoint) if (attachment instanceof Closeable)
IO.close((EndPoint)attachment); IO.close((Closeable)attachment);
} }
}
_selector = null; _selector = null;
IO.close(selector); IO.close(selector);
_stopped.countDown(); _stopped.countDown();
@ -936,6 +1030,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
IO.close(_connect.channel); IO.close(_connect.channel);
LOG.warn(String.valueOf(failure)); LOG.warn(String.valueOf(failure));
if (LOG.isDebugEnabled())
LOG.debug(failure); LOG.debug(failure);
_connect.failed(failure); _connect.failed(failure);
} }
@ -944,7 +1039,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override @Override
public String toString() public String toString()
{ {
return String.format("CreateEndPoint@%x{%s,%s}", hashCode(), _connect, _key); return String.format("CreateEndPoint@%x{%s}", hashCode(), _connect);
} }
} }

View File

@ -214,7 +214,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/ */
public void accept(SelectableChannel channel, Object attachment) public void accept(SelectableChannel channel, Object attachment)
{ {
final ManagedSelector selector = chooseSelector(); ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel, attachment)); selector.submit(selector.new Accept(channel, attachment));
} }
@ -229,7 +229,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/ */
public Closeable acceptor(SelectableChannel server) public Closeable acceptor(SelectableChannel server)
{ {
final ManagedSelector selector = chooseSelector(); ManagedSelector selector = chooseSelector();
ManagedSelector.Acceptor acceptor = selector.new Acceptor(server); ManagedSelector.Acceptor acceptor = selector.new Acceptor(server);
selector.submit(acceptor); selector.submit(acceptor);
return acceptor; return acceptor;

View File

@ -18,24 +18,15 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
public class SocketChannelEndPoint extends ChannelEndPoint public class SocketChannelEndPoint extends ChannelEndPoint
{ {
private static final Logger LOG = Log.getLogger(SocketChannelEndPoint.class);
private final Socket _socket;
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{ {
this((SocketChannel)channel, selector, key, scheduler); this((SocketChannel)channel, selector, key, scheduler);
@ -44,40 +35,10 @@ public class SocketChannelEndPoint extends ChannelEndPoint
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{ {
super(channel, selector, key, scheduler); super(channel, selector, key, scheduler);
_socket = channel.socket();
_local = (InetSocketAddress)_socket.getLocalSocketAddress();
_remote = (InetSocketAddress)_socket.getRemoteSocketAddress();
} }
public Socket getSocket() public Socket getSocket()
{ {
return _socket; return getChannel().socket();
}
@Override
public InetSocketAddress getLocalAddress()
{
return _local;
}
@Override
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
@Override
protected void doShutdownOutput()
{
try
{
if (!_socket.isOutputShutdown())
_socket.shutdownOutput();
}
catch (IOException e)
{
LOG.debug(e);
}
} }
} }

View File

@ -33,12 +33,15 @@ public class UnixSocketEndPoint extends ChannelEndPoint
{ {
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class); private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
private final UnixSocketChannel _channel;
public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{ {
super(channel, selector, key, scheduler); super(channel, selector, key, scheduler);
_channel = channel; }
@Override
public UnixSocketChannel getChannel()
{
return (UnixSocketChannel)super.getChannel();
} }
@Override @Override
@ -56,11 +59,9 @@ public class UnixSocketEndPoint extends ChannelEndPoint
@Override @Override
protected void doShutdownOutput() protected void doShutdownOutput()
{ {
if (LOG.isDebugEnabled())
LOG.debug("oshut {}", this);
try try
{ {
_channel.shutdownOutput(); getChannel().shutdownOutput();
super.doShutdownOutput(); super.doShutdownOutput();
} }
catch (IOException e) catch (IOException e)

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.EnabledOnOs;
@ -145,6 +146,7 @@ public class UnixSocketTest
assertThat(contentResponse.getContentAsString(), containsString("Hello World")); assertThat(contentResponse.getContentAsString(), containsString("Hello World"));
} }
@Tag("external")
@Test @Test
public void testNotLocal() throws Exception public void testNotLocal() throws Exception
{ {

View File

@ -75,6 +75,7 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
@ -398,6 +399,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(TransportProvider.class) @ArgumentsSource(TransportProvider.class)
@Tag("Unstable") @Tag("Unstable")
@Disabled
public void testAsyncWriteClosed(Transport transport) throws Exception public void testAsyncWriteClosed(Transport transport) throws Exception
{ {
init(transport); init(transport);

View File

@ -0,0 +1,379 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.test;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RecoverFailedSelectorTest
{
private Server server;
private ServerConnector connector;
private void start(Function<Server, ServerConnector> consumer) throws Exception
{
server = new Server();
connector = consumer.apply(server);
server.addConnector(connector);
server.start();
}
@AfterEach
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testSelectFailureBetweenReads() throws Exception
{
// There will be 3 calls to select(): one at start(),
// one to accept, and one to set read interest.
CountDownLatch selectLatch = new CountDownLatch(3);
CountDownLatch failureLatch = new CountDownLatch(1);
AtomicBoolean fail = new AtomicBoolean();
start(server -> new ServerConnector(server, 1, 1)
{
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(this, id)
{
@Override
protected int nioSelect(Selector selector, boolean now) throws IOException
{
selectLatch.countDown();
if (fail.getAndSet(false))
throw new IOException("explicit select() failure");
return super.nioSelect(selector, now);
}
@Override
protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException
{
super.handleSelectFailure(selector, failure);
failureLatch.countDown();
}
};
}
};
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
assertTrue(selectLatch.await(5, TimeUnit.SECONDS));
String request = "GET / HTTP/1.0\r\n\r\n";
int split = request.length() / 2;
ByteBuffer chunk1 = StandardCharsets.UTF_8.encode(request.substring(0, split));
ByteBuffer chunk2 = StandardCharsets.UTF_8.encode(request.substring(split));
// Wake up the selector and fail it.
fail.set(true);
client.write(chunk1);
// Wait for the failure handling to be completed.
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Write the rest of the request, the
// server should be able to continue.
client.write(chunk2);
HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client));
assertNotNull(response);
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
}
}
@Test
public void testAcceptDuringSelectFailure() throws Exception
{
// There will be 3 calls to select(): one at start(),
// one to accept, and one to set read interest.
CountDownLatch selectLatch = new CountDownLatch(3);
CountDownLatch failureLatch = new CountDownLatch(1);
AtomicBoolean fail = new AtomicBoolean();
AtomicReference<SocketChannel> socketRef = new AtomicReference<>();
start(server -> new ServerConnector(server, 1, 1)
{
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(this, id)
{
@Override
protected int nioSelect(Selector selector, boolean now) throws IOException
{
selectLatch.countDown();
if (fail.getAndSet(false))
throw new IOException("explicit select() failure");
return super.nioSelect(selector, now);
}
@Override
protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException
{
// Before handling the failure, connect with another socket.
SocketChannel socket = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()));
socketRef.set(socket);
super.handleSelectFailure(selector, failure);
failureLatch.countDown();
}
};
}
};
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
assertTrue(selectLatch.await(5, TimeUnit.SECONDS));
String request = "GET / HTTP/1.0\r\n\r\n";
ByteBuffer buffer = StandardCharsets.UTF_8.encode(request);
// Wake up the selector and fail it.
fail.set(true);
client.write(buffer);
// Wait for the failure handling to be completed.
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client));
assertNotNull(response);
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
// Verify that the newly created socket works well.
SocketChannel socket = socketRef.get();
buffer.flip();
socket.write(buffer);
response = HttpTester.parseResponse(HttpTester.from(socket));
assertNotNull(response);
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
}
}
@Test
public void testSelectFailureDuringEndPointCreation() throws Exception
{
// There will be 2 calls to select(): one at start(), one to accept.
CountDownLatch selectLatch = new CountDownLatch(2);
CountDownLatch failureLatch = new CountDownLatch(1);
AtomicBoolean fail = new AtomicBoolean();
CountDownLatch endPointLatch1 = new CountDownLatch(1);
CountDownLatch endPointLatch2 = new CountDownLatch(1);
start(server -> new ServerConnector(server, 1, 1)
{
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(this, id)
{
@Override
protected int nioSelect(Selector selector, boolean now) throws IOException
{
selectLatch.countDown();
if (fail.getAndSet(false))
throw new IOException("explicit select() failure");
return super.nioSelect(selector, now);
}
@Override
protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException
{
super.handleSelectFailure(selector, failure);
failureLatch.countDown();
}
};
}
@Override
protected ChannelEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
try
{
ChannelEndPoint endPoint = super.newEndPoint(channel, selectSet, selectionKey);
endPointLatch1.countDown();
assertTrue(endPointLatch2.await(5, TimeUnit.SECONDS));
return endPoint;
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
};
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
assertTrue(selectLatch.await(5, TimeUnit.SECONDS));
// Wait until the server EndPoint instance is created.
assertTrue(endPointLatch1.await(5, TimeUnit.SECONDS));
// Wake up the selector and fail it.
fail.set(true);
SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())).close();
// Wait until the selector is replaced.
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Continue the EndPoint creation.
endPointLatch2.countDown();
String request = "GET / HTTP/1.0\r\n\r\n";
ByteBuffer buffer = StandardCharsets.UTF_8.encode(request);
client.write(buffer);
HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client));
assertNotNull(response);
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
}
}
@Test
public void testSelectFailureDuringEndPointCreatedThenClosed() throws Exception
{
// There will be 2 calls to select(): one at start(), one to accept.
CountDownLatch selectLatch = new CountDownLatch(2);
CountDownLatch failureLatch = new CountDownLatch(1);
AtomicBoolean fail = new AtomicBoolean();
CountDownLatch connectionLatch1 = new CountDownLatch(1);
CountDownLatch connectionLatch2 = new CountDownLatch(1);
start(server -> new ServerConnector(server, 1, 1)
{
@Override
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new ServerConnectorManager(executor, scheduler, selectors)
{
@Override
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(this, id)
{
@Override
protected int nioSelect(Selector selector, boolean now) throws IOException
{
selectLatch.countDown();
if (fail.getAndSet(false))
throw new IOException("explicit select() failure");
return super.nioSelect(selector, now);
}
@Override
protected void handleSelectFailure(Selector selector, Throwable failure) throws IOException
{
super.handleSelectFailure(selector, failure);
failureLatch.countDown();
}
};
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
try
{
Connection connection = super.newConnection(channel, endPoint, attachment);
endPoint.close();
connectionLatch1.countDown();
assertTrue(connectionLatch2.await(5, TimeUnit.SECONDS));
return connection;
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
};
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
assertTrue(selectLatch.await(5, TimeUnit.SECONDS));
// Wait until the server EndPoint is closed.
assertTrue(connectionLatch1.await(5, TimeUnit.SECONDS));
// Wake up the selector and fail it.
fail.set(true);
SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())).close();
// Wait until the selector is replaced.
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Continue the server processing.
connectionLatch2.countDown();
// The channel has been closed on the server.
int read = client.read(ByteBuffer.allocate(1));
assertTrue(read < 0);
}
}
}

View File

@ -1,5 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog #org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog
org.eclipse.jetty.LEVEL=WARN
#org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.websocket.LEVEL=DEBUG #org.eclipse.jetty.websocket.LEVEL=DEBUG