Issue #2046 - Graceful stop of connections (#2100)

* Clean up of actions (now updates) prior to #2046 fix
* prevent exceptions from termincating lifecycle doStop or destroy
* Refactored ManagedSelector stop to always close endpoints
* Fixed NPE if SelectorManager is already stopped
* refactored after review
* further simplifications after review
* Wait only for oshut endpoints
* Cleanup from review

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2018-01-09 20:36:41 +01:00 committed by GitHub
parent 21365234f8
commit 356bf2e06f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 455 additions and 245 deletions

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.nio.channels.Selector;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -29,7 +30,6 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -89,10 +89,10 @@ public class LivelockTest
AtomicBoolean busy = new AtomicBoolean(true); AtomicBoolean busy = new AtomicBoolean(true);
ManagedSelector clientSelector = client.getContainedBeans(ManagedSelector.class).stream().findAny().get(); ManagedSelector clientSelector = client.getContainedBeans(ManagedSelector.class).stream().findAny().get();
Runnable clientLivelock = new Invocable.NonBlocking() ManagedSelector.SelectorUpdate clientLivelock = new ManagedSelector.SelectorUpdate()
{ {
@Override @Override
public void run() public void update(Selector selector)
{ {
sleep(10); sleep(10);
if (busy.get()) if (busy.get())
@ -102,10 +102,10 @@ public class LivelockTest
clientSelector.submit(clientLivelock); clientSelector.submit(clientLivelock);
ManagedSelector serverSelector = connector.getContainedBeans(ManagedSelector.class).stream().findAny().get(); ManagedSelector serverSelector = connector.getContainedBeans(ManagedSelector.class).stream().findAny().get();
Runnable serverLivelock = new Invocable.NonBlocking() ManagedSelector.SelectorUpdate serverLivelock = new ManagedSelector.SelectorUpdate()
{ {
@Override @Override
public void run() public void update(Selector selector)
{ {
sleep(10); sleep(10);
if (busy.get()) if (busy.get())

View File

@ -25,6 +25,7 @@ import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.GatheringByteChannel; import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -41,7 +42,6 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
{ {
private static final Logger LOG = Log.getLogger(ChannelEndPoint.class); private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
private final Locker _locker = new Locker();
private final ByteChannel _channel; private final ByteChannel _channel;
private final GatheringByteChannel _gather; private final GatheringByteChannel _gather;
protected final ManagedSelector _selector; protected final ManagedSelector _selector;
@ -95,16 +95,10 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
} }
private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey") private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate()
{ {
@Override @Override
public InvocationType getInvocationType() public void update(Selector selector)
{
return InvocationType.NON_BLOCKING;
}
@Override
public void run()
{ {
updateKey(); updateKey();
} }
@ -336,7 +330,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
int readyOps = _key.readyOps(); int readyOps = _key.readyOps();
int oldInterestOps; int oldInterestOps;
int newInterestOps; int newInterestOps;
try (Locker.Lock lock = _locker.lock()) synchronized(this)
{ {
_updatePending = true; _updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
@ -376,7 +370,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
{ {
int oldInterestOps; int oldInterestOps;
int newInterestOps; int newInterestOps;
try (Locker.Lock lock = _locker.lock()) synchronized(this)
{ {
_updatePending = false; _updatePending = false;
oldInterestOps = _currentInterestOps; oldInterestOps = _currentInterestOps;
@ -413,7 +407,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
int oldInterestOps; int oldInterestOps;
int newInterestOps; int newInterestOps;
boolean pending; boolean pending;
try (Locker.Lock lock = _locker.lock()) synchronized(this)
{ {
pending = _updatePending; pending = _updatePending;
oldInterestOps = _desiredInterestOps; oldInterestOps = _desiredInterestOps;
@ -426,7 +420,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending && _selector!=null) if (!pending && _selector!=null)
_selector.submit(_runUpdateKey); _selector.submit(_updateKeyAction);
} }
@Override @Override

View File

@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -40,6 +41,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -48,8 +50,6 @@ import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
@ -64,14 +64,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
private static final Logger LOG = Log.getLogger(ManagedSelector.class); private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final Locker _locker = new Locker(); private final AtomicBoolean _started = new AtomicBoolean(false);
private boolean _selecting = false; private boolean _selecting = false;
private final Deque<Runnable> _actions = new ArrayDeque<>();
private final SelectorManager _selectorManager; private final SelectorManager _selectorManager;
private final int _id; private final int _id;
private final ExecutionStrategy _strategy; private final ExecutionStrategy _strategy;
private Selector _selector; private Selector _selector;
private int _actionCount; private Deque<SelectorUpdate> _updates = new ArrayDeque<>();
private Deque<SelectorUpdate> _updateable = new ArrayDeque<>();
public ManagedSelector(SelectorManager selectorManager, int id) public ManagedSelector(SelectorManager selectorManager, int id)
{ {
@ -102,6 +102,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
// The normal strategy obtains the produced task, schedules // The normal strategy obtains the produced task, schedules
// a new thread to produce more, runs the task and then exits. // a new thread to produce more, runs the task and then exits.
_selectorManager.execute(_strategy::produce); _selectorManager.execute(_strategy::produce);
// Set started only if we really are started
submit(s->_started.set(true));
} }
public int size() public int size()
@ -115,30 +118,63 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
if (LOG.isDebugEnabled()) // doStop might be called for a failed managedSelector,
LOG.debug("Stopping {}", this); // We do not want to wait twice, so we only stop once for each start
CloseEndPoints close_endps = new CloseEndPoints(); boolean timeout = false;
submit(close_endps); if (_started.compareAndSet(true,false))
close_endps.await(getStopTimeout()); {
CloseSelector close_selector = new CloseSelector(); if (getStopTimeout()>0)
submit(close_selector); {
close_selector.await(getStopTimeout()); // If we are graceful we can wait for connections to close
Set<Closeable> closed = new HashSet<>();
long now = System.nanoTime();
long wait_until = now+TimeUnit.MILLISECONDS.toNanos(getStopTimeout());
while(now<wait_until)
{
// Close any connection and wait for no endpoints in selector
CloseConnections close_connections = new CloseConnections(closed);
submit(close_connections);
if (close_connections._noEndPoints.await(100,TimeUnit.MILLISECONDS))
break;
now = System.nanoTime();
}
}
else
{
// Close connections, but only wait a single selector cycle for it to take effect
CloseConnections close_connections = new CloseConnections();
submit(close_connections);
close_connections._complete.await();
}
// Wait for any remaining endpoints to be closed and the selector to be stopped
StopSelector stop_selector = new StopSelector();
submit(stop_selector);
stop_selector._stopped.await();
timeout = getStopTimeout()>0 && stop_selector._forcedEndPointClose;
}
super.doStop(); super.doStop();
if (LOG.isDebugEnabled()) if (timeout)
LOG.debug("Stopped {}", this); throw new TimeoutException();
} }
public void submit(Runnable change) /**
* Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()}
* @param action
*/
public void submit(SelectorUpdate action)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", change, this); LOG.debug("Queued change {} on {}", action, this);
Selector selector = null; Selector selector = null;
try (Locker.Lock lock = _locker.lock()) synchronized(ManagedSelector.this)
{ {
_actions.offer(change); _updates.offer(action);
if (_selecting) if (_selecting)
{ {
@ -178,13 +214,22 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (connect.timeout.cancel()) if (connect.timeout.cancel())
{ {
key.interestOps(0); key.interestOps(0);
execute(new CreateEndPoint(channel, key) execute(new Runnable()
{ {
@Override @Override
protected void failed(Throwable failure) public void run()
{ {
super.failed(failure); try
connect.failed(failure); {
createEndPoint(channel,key);
}
catch(Throwable failure)
{
closeNoExceptions(channel);
LOG.warn(String.valueOf(failure));
LOG.debug(failure);
connect.failed(failure);
}
} }
}); });
} }
@ -204,7 +249,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private void closeNoExceptions(Closeable closeable) private static void closeNoExceptions(Closeable closeable)
{ {
try try
{ {
@ -237,9 +282,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private int getActionSize() private int getActionSize()
{ {
try (Locker.Lock lock = _locker.lock()) synchronized(ManagedSelector.this)
{ {
return _actions.size(); return _updates.size();
} }
} }
@ -248,16 +293,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
Selector selector = _selector; Selector selector = _selector;
List<String> keys = null; List<String> keys = null;
List<Runnable> actions = null; List<SelectorUpdate> actions = null;
if (selector != null && selector.isOpen()) if (selector != null && selector.isOpen())
{ {
DumpKeys dump = new DumpKeys(); DumpKeys dump = new DumpKeys();
String actionsAt; String actionsAt;
try (Locker.Lock lock = _locker.lock()) synchronized(ManagedSelector.this)
{ {
actionsAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()); actionsAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
actions = new ArrayList<>(_actions); actions = new ArrayList<>(_updates);
_actions.addFirst(dump); _updates.addFirst(dump);
_selecting = false; _selecting = false;
} }
selector.wakeup(); selector.wakeup();
@ -321,9 +366,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (task != null) if (task != null)
return task; return task;
Runnable action = nextAction(); processUpdates();
if (action != null)
return action;
updateKeys(); updateKeys();
@ -332,61 +375,49 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private Runnable nextAction() private void processUpdates()
{ {
Selector selector = null; synchronized(ManagedSelector.this)
Runnable action = null;
try (Locker.Lock lock = _locker.lock())
{ {
// It is important to avoid live-lock (busy blocking) here. If too many actions Deque<SelectorUpdate> updates = _updates;
// are submitted, this can indefinitely defer selection happening. Similarly if _updates = _updateable;
// we give too much priority to selection, it may prevent actions from being run. _updateable = updates;
// The solution implemented here is to only process the number of actions that were
// originally in the action queue before attempting a select
if (_actionCount==0)
{
// Calculate how many actions we are prepared to handle before selection
_actionCount = _actions.size();
if (_actionCount>0)
action = _actions.poll();
else
_selecting = true;
}
else if (_actionCount==1)
{
_actionCount = 0;
if (LOG.isDebugEnabled())
LOG.debug("Forcing selection, actions={}",_actions.size());
if (_actions.size()==0)
{
// This was the last action, so select normally
_selecting = true;
}
else
{
// there are still more actions to handle, so
// immediately wake up (as if remaining action were just added).
selector = _selector;
_selecting = false;
}
}
else
{
_actionCount--;
action = _actions.poll();
}
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("action={} wakeup={}",action,selector!=null); LOG.debug("updateable {}",_updateable.size());
for (SelectorUpdate update : _updateable)
{
if (_selector==null)
break;
try
{
if (LOG.isDebugEnabled())
LOG.debug("update {}",update);
update.update(_selector);
}
catch(Throwable th)
{
LOG.warn(th);
}
}
_updateable.clear();
Selector selector;
int actions;
synchronized(ManagedSelector.this)
{
actions = _updates.size();
_selecting = actions==0;
selector = _selecting?null:_selector;
}
if (LOG.isDebugEnabled())
LOG.debug("actions {}",actions);
if (selector != null) if (selector != null)
selector.wakeup(); selector.wakeup();
return action;
} }
private boolean select() private boolean select()
@ -403,11 +434,11 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Selector {} woken up from select, {}/{} selected", selector, selected, selector.keys().size()); LOG.debug("Selector {} woken up from select, {}/{} selected", selector, selected, selector.keys().size());
int actions; int actions;
try (Locker.Lock lock = _locker.lock()) synchronized(ManagedSelector.this)
{ {
// finished selecting // finished selecting
_selecting = false; _selecting = false;
actions = _actions.size(); actions = _updates.size();
} }
_keys = selector.selectedKeys(); _keys = selector.selectedKeys();
@ -506,33 +537,37 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private class DumpKeys extends Invocable.NonBlocking /**
* A selector update to be done when the selector has been woken.
*/
public interface SelectorUpdate
{
public void update(Selector selector);
}
private static class DumpKeys implements SelectorUpdate
{ {
private CountDownLatch latch = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1);
private List<String> keys; private List<String> keys;
@Override @Override
public void run() public void update(Selector selector)
{ {
Selector selector = _selector; Set<SelectionKey> selector_keys = selector.keys();
if (selector != null && selector.isOpen()) List<String> list = new ArrayList<>(selector_keys.size()+1);
list.add(selector + " keys=" + selector_keys.size());
for (SelectionKey key : selector_keys)
{ {
Set<SelectionKey> selector_keys = selector.keys(); try
List<String> list = new ArrayList<>(selector_keys.size()+1);
list.add(selector + " keys=" + selector_keys.size());
for (SelectionKey key : selector_keys)
{ {
try list.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
{ }
list.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment())); catch (Throwable x)
} {
catch (Throwable x) list.add(String.format("SelectionKey@%x[%s]->%s", key.hashCode(), x, key.attachment()));
{
list.add(String.format("SelectionKey@%x[%s]->%s", key.hashCode(), x, key.attachment()));
}
} }
keys = list;
} }
keys = list;
latch.countDown(); latch.countDown();
} }
@ -551,7 +586,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
class Acceptor extends Invocable.NonBlocking implements Selectable, Closeable class Acceptor implements SelectorUpdate, Selectable, Closeable
{ {
private final SelectableChannel _channel; private final SelectableChannel _channel;
private SelectionKey _key; private SelectionKey _key;
@ -562,13 +597,13 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
@Override @Override
public void run() public void update(Selector selector)
{ {
try try
{ {
if (_key==null) if (_key==null)
{ {
_key = _channel.register(_selector, SelectionKey.OP_ACCEPT, this); _key = _channel.register(selector, SelectionKey.OP_ACCEPT, this);
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -620,10 +655,11 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
class Accept extends Invocable.NonBlocking implements Closeable class Accept implements SelectorUpdate, Runnable, Closeable
{ {
private final SelectableChannel channel; private final SelectableChannel channel;
private final Object attachment; private final Object attachment;
private SelectionKey key;
Accept(SelectableChannel channel, Object attachment) Accept(SelectableChannel channel, Object attachment)
{ {
@ -639,12 +675,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
@Override @Override
public void run() public void update(Selector selector)
{ {
try try
{ {
final SelectionKey key = channel.register(_selector, 0, attachment); key = channel.register(selector, 0, attachment);
execute(new CreateEndPoint(channel, key)); execute(this);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -652,18 +688,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug(x); LOG.debug(x);
} }
} }
}
private class CreateEndPoint implements Runnable, Closeable
{
private final SelectableChannel channel;
private final SelectionKey key;
public CreateEndPoint(SelectableChannel channel, SelectionKey key)
{
this.channel = channel;
this.key = key;
}
@Override @Override
public void run() public void run()
@ -679,13 +703,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
@Override
public void close()
{
LOG.debug("closed creation of {}", channel);
closeNoExceptions(channel);
}
protected void failed(Throwable failure) protected void failed(Throwable failure)
{ {
closeNoExceptions(channel); closeNoExceptions(channel);
@ -694,7 +711,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
class Connect extends Invocable.NonBlocking
class Connect implements SelectorUpdate, Runnable
{ {
private final AtomicBoolean failed = new AtomicBoolean(); private final AtomicBoolean failed = new AtomicBoolean();
private final SelectableChannel channel; private final SelectableChannel channel;
@ -705,15 +723,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
this.channel = channel; this.channel = channel;
this.attachment = attachment; this.attachment = attachment;
this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS); this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(this, ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
} }
@Override @Override
public void run() public void update(Selector selector)
{ {
try try
{ {
channel.register(_selector, SelectionKey.OP_CONNECT, this); channel.register(selector, SelectionKey.OP_CONNECT, this);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -721,7 +739,18 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private void failed(Throwable failure) @Override
public void run()
{
if (_selectorManager.isConnectionPending(channel))
{
if (LOG.isDebugEnabled())
LOG.debug("Channel {} timed out while connecting, closing it", channel);
failed(new SocketTimeoutException("Connect Timeout"));
}
}
public void failed(Throwable failure)
{ {
if (failed.compareAndSet(false, true)) if (failed.compareAndSet(false, true))
{ {
@ -732,120 +761,97 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private class ConnectTimeout extends Invocable.NonBlocking private class CloseConnections implements SelectorUpdate
{ {
private final Connect connect; final Set<Closeable> _closed;
final CountDownLatch _noEndPoints = new CountDownLatch(1);
final CountDownLatch _complete = new CountDownLatch(1);
private ConnectTimeout(Connect connect) public CloseConnections()
{ {
this.connect = connect; this(null);
}
public CloseConnections(Set<Closeable> closed)
{
_closed = closed;
} }
@Override @Override
public void run() public void update(Selector selector)
{ {
SelectableChannel channel = connect.channel; if (LOG.isDebugEnabled())
if (_selectorManager.isConnectionPending(channel)) LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this);
boolean zero = true;
for (SelectionKey key : selector.keys())
{ {
if (LOG.isDebugEnabled()) if (key.isValid())
LOG.debug("Channel {} timed out while connecting, closing it", channel); {
connect.failed(new SocketTimeoutException("Connect Timeout")); Closeable closeable = null;
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
{
EndPoint endp = (EndPoint)attachment;
if (!endp.isOutputShutdown())
zero = false;
Connection connection = endp.getConnection();
if (connection != null)
closeable = connection;
else
closeable = endp;
}
if (closeable!=null)
{
if (_closed==null)
{
closeNoExceptions(closeable);
}
else if (!_closed.contains(closeable))
{
_closed.add(closeable);
closeNoExceptions(closeable);
}
}
}
} }
if (zero)
_noEndPoints.countDown();
_complete.countDown();
} }
} }
private class CloseEndPoints extends Invocable.NonBlocking private class StopSelector implements SelectorUpdate
{ {
private final CountDownLatch _latch = new CountDownLatch(1); CountDownLatch _stopped = new CountDownLatch(1);
private CountDownLatch _allClosed; boolean _forcedEndPointClose = false;
@Override @Override
public void run() public void update(Selector selector)
{ {
List<EndPoint> end_points = new ArrayList<>(); for (SelectionKey key : selector.keys())
for (SelectionKey key : _selector.keys())
{ {
if (key.isValid()) if (key.isValid())
{ {
Object attachment = key.attachment(); Object attachment = key.attachment();
if (attachment instanceof EndPoint) if (attachment instanceof EndPoint)
end_points.add((EndPoint)attachment); {
EndPoint endp = (EndPoint)attachment;
if (!endp.isOutputShutdown())
_forcedEndPointClose = true;
closeNoExceptions((EndPoint)attachment);
}
} }
} }
int size = end_points.size();
if (LOG.isDebugEnabled())
LOG.debug("Closing {} endPoints on {}", size, ManagedSelector.this);
_allClosed = new CountDownLatch(size);
_latch.countDown();
for (EndPoint endp : end_points)
submit(new EndPointCloser(endp, _allClosed));
if (LOG.isDebugEnabled())
LOG.debug("Closed {} endPoints on {}", size, ManagedSelector.this);
}
public boolean await(long timeout)
{
try
{
return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
_allClosed.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
return false;
}
}
}
private class EndPointCloser implements Runnable
{
private final EndPoint _endPoint;
private final CountDownLatch _latch;
private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
{
_endPoint = endPoint;
_latch = latch;
}
@Override
public void run()
{
closeNoExceptions(_endPoint.getConnection());
_latch.countDown();
}
}
private class CloseSelector extends Invocable.NonBlocking
{
private CountDownLatch _latch = new CountDownLatch(1);
@Override
public void run()
{
Selector selector = _selector;
_selector = null; _selector = null;
closeNoExceptions(selector); closeNoExceptions(selector);
_latch.countDown(); _stopped.countDown();
}
public boolean await(long timeout)
{
try
{
return _latch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
return false;
}
} }
} }
private class DestroyEndPoint implements Runnable, Closeable private class DestroyEndPoint implements Runnable, Closeable
{ {
private final EndPoint endPoint; private final EndPoint endPoint;

View File

@ -27,6 +27,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator; import java.util.function.IntUnaryOperator;
@ -64,6 +65,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private int _reservedThreads = -1; private int _reservedThreads = -1;
private ThreadPoolBudget.Lease _lease; private ThreadPoolBudget.Lease _lease;
private ReservedThreadExecutor _reservedThreadExecutor;
private static int defaultSelectors(Executor executor) private static int defaultSelectors(Executor executor)
{ {
@ -260,7 +262,8 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads,this),true); _reservedThreadExecutor = new ReservedThreadExecutor(getExecutor(),_reservedThreads,this);
addBean(_reservedThreadExecutor,true);
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _selectors.length); _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _selectors.length);
for (int i = 0; i < _selectors.length; i++) for (int i = 0; i < _selectors.length; i++)
{ {
@ -285,11 +288,25 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
super.doStop(); try
for (ManagedSelector selector : _selectors) {
removeBean(selector); super.doStop();
if (_lease != null) }
_lease.close(); finally
{
// Cleanup
for (ManagedSelector selector : _selectors)
{
if (selector!=null)
removeBean(selector);
}
Arrays.fill(_selectors,null);
if (_reservedThreadExecutor!=null)
removeBean(_reservedThreadExecutor);
_reservedThreadExecutor = null;
if (_lease != null)
_lease.close();
}
} }
/** /**

View File

@ -29,6 +29,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.ManagedSelector.SelectorUpdate;
import org.eclipse.jetty.io.ManagedSelector.Connect;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;

View File

@ -25,12 +25,15 @@ import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import java.io.BufferedReader;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.Socket; import java.net.Socket;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -41,12 +44,18 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler; import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -266,6 +275,167 @@ public class GracefulStopTest
} }
public void testSlowClose(long stopTimeout, long closeWait, Matcher<Long> stopTimeMatcher) throws Exception
{
Server server= new Server();
server.setStopTimeout(stopTimeout);
CountDownLatch closed = new CountDownLatch(1);
ServerConnector connector = new ServerConnector(server, 2, 2, new HttpConnectionFactory()
{
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
// Slow closing connection
HttpConnection conn = new HttpConnection(getHttpConfiguration(), connector, endPoint, getHttpCompliance(), isRecordHttpComplianceViolations())
{
@Override
public void close()
{
try
{
new Thread(()->
{
try
{
Thread.sleep(closeWait);
}
catch (InterruptedException e)
{
}
finally
{
super.close();
}
}).start();
}
catch(Exception e)
{
// e.printStackTrace();
}
finally
{
closed.countDown();
}
}
};
return configure(conn, connector, endPoint);
}
});
connector.setPort(0);
server.addConnector(connector);
NoopHandler handler = new NoopHandler();
server.setHandler(handler);
server.start();
final int port=connector.getLocalPort();
Socket client = new Socket("127.0.0.1", port);
client.setSoTimeout(10000);
client.getOutputStream().write((
"GET / HTTP/1.1\r\n"+
"Host: localhost:"+port+"\r\n" +
"Content-Type: plain/text\r\n" +
"\r\n"
).getBytes());
client.getOutputStream().flush();
handler.latch.await();
// look for a response
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream() ,StandardCharsets.ISO_8859_1));
while(true)
{
String line = in.readLine();
if (line==null)
Assert.fail();
if (line.length()==0)
break;
}
long start = System.nanoTime();
try
{
server.stop();
Assert.assertTrue(stopTimeout==0 || stopTimeout>closeWait);
}
catch(Exception e)
{
Assert.assertTrue(stopTimeout>0 && stopTimeout<closeWait);
}
long stop = System.nanoTime();
// Check stop time was correct
assertThat(TimeUnit.NANOSECONDS.toMillis(stop-start),stopTimeMatcher);
// Connection closed
while(true)
{
int r = client.getInputStream().read();
if (r==-1)
break;
}
// onClose Thread interrupted or completed
if (stopTimeout>0)
Assert.assertTrue(closed.await(1000,TimeUnit.MILLISECONDS));
if (!client.isClosed())
client.close();
}
/**
* Test of non graceful stop when a connection close is slow
* @throws Exception on test failure
*/
@Test
public void testSlowCloseNotGraceful() throws Exception
{
Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped");
testSlowClose(0,5000,lessThan(750L));
}
/**
* Test of graceful stop when close is slower than timeout
* @throws Exception on test failure
*/
@Test
public void testSlowCloseTinyGraceful() throws Exception
{
Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped");
testSlowClose(1,5000,lessThan(1500L));
}
/**
* Test of graceful stop when close is faster than timeout;
* @throws Exception on test failure
*/
@Test
public void testSlowCloseGraceful() throws Exception
{
testSlowClose(5000,1000,Matchers.allOf(greaterThan(750L),lessThan(4999L)));
}
static class NoopHandler extends AbstractHandler
{
final CountDownLatch latch = new CountDownLatch(1);
NoopHandler()
{
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
baseRequest.setHandled(true);
latch.countDown();
}
}
static class TestHandler extends AbstractHandler static class TestHandler extends AbstractHandler
{ {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -106,6 +107,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
if (!l.isRunning()) if (!l.isRunning())
start(l); start(l);
break; break;
case AUTO: case AUTO:
if (l.isRunning()) if (l.isRunning())
unmanage(b); unmanage(b);
@ -115,6 +117,9 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
start(l); start(l);
} }
break; break;
default:
break;
} }
} }
} }
@ -154,14 +159,23 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
super.doStop(); super.doStop();
List<Bean> reverse = new ArrayList<>(_beans); List<Bean> reverse = new ArrayList<>(_beans);
Collections.reverse(reverse); Collections.reverse(reverse);
MultiException mex = new MultiException();
for (Bean b : reverse) for (Bean b : reverse)
{ {
if (b._managed==Managed.MANAGED && b._bean instanceof LifeCycle) if (b._managed==Managed.MANAGED && b._bean instanceof LifeCycle)
{ {
LifeCycle l = (LifeCycle)b._bean; LifeCycle l = (LifeCycle)b._bean;
stop(l); try
{
stop(l);
}
catch (Throwable th)
{
mex.add(th);
}
} }
} }
mex.ifExceptionThrow();
} }
/** /**
@ -178,7 +192,14 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
if (b._bean instanceof Destroyable && (b._managed==Managed.MANAGED || b._managed==Managed.POJO)) if (b._bean instanceof Destroyable && (b._managed==Managed.MANAGED || b._managed==Managed.POJO))
{ {
Destroyable d = (Destroyable)b._bean; Destroyable d = (Destroyable)b._bean;
d.destroy(); try
{
d.destroy();
}
catch(Throwable th)
{
LOG.warn(th);
}
} }
} }
_beans.clear(); _beans.clear();