Jetty9 - Introduced Connection.close() for better stop of lifecycle components.

This commit is contained in:
Simone Bordet 2012-08-10 10:32:45 +02:00
parent cf5801f437
commit 2d832fa9ad
4 changed files with 96 additions and 34 deletions

View File

@ -34,21 +34,21 @@ public abstract class AbstractConnection implements Connection
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final AtomicBoolean _readInterested = new AtomicBoolean();
private final EndPoint _endp;
private final Callback<Void> _readCallback;
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback<Void> _readCallback;
public AbstractConnection(EndPoint endp, Executor executor)
{
this(endp, executor, false);
this(endp, executor, true);
}
public AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnlyFailure)
public AbstractConnection(EndPoint endp, Executor executor, final boolean dispatchCompletion)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_executor=executor;
_endp = endp;
_endPoint = endp;
_executor = executor;
_readCallback = new ExecutorCallback<Void>(executor)
{
@Override
@ -65,9 +65,9 @@ public abstract class AbstractConnection implements Connection
}
@Override
protected boolean execute()
protected boolean shouldDispatchCompletion()
{
return !executeOnlyFailure;
return dispatchCompletion;
}
@Override
@ -82,8 +82,7 @@ public abstract class AbstractConnection implements Connection
{
return _executor;
}
/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
@ -110,17 +109,17 @@ public abstract class AbstractConnection implements Connection
public void onFillInterestedFailed(Throwable cause)
{
LOG.debug("{} onFillInterestedFailed {}", this, cause);
if (_endp.isOpen())
if (_endPoint.isOpen())
{
boolean close = true;
if (cause instanceof TimeoutException)
close = onReadTimeout();
if (close)
{
if (_endp.isOutputShutdown())
_endp.close();
if (_endPoint.isOutputShutdown())
_endPoint.close();
else
_endp.shutdownOutput();
_endPoint.shutdownOutput();
}
}
}
@ -149,7 +148,13 @@ public abstract class AbstractConnection implements Connection
@Override
public EndPoint getEndPoint()
{
return _endp;
return _endPoint;
}
@Override
public void close()
{
getEndPoint().close();
}
@Override

View File

@ -16,27 +16,37 @@ package org.eclipse.jetty.io;
import org.eclipse.jetty.util.Callback;
/**
* <p>An {@link Connection} is associated to an {@link EndPoint} so that I/O events
* <p>A {@link Connection} is associated to an {@link EndPoint} so that I/O events
* happening on the {@link EndPoint} can be processed by the {@link Connection}.</p>
* <p>A typical implementation of {@link Connection} overrides {@link #onOpen()} to
* {@link EndPoint#fillInterested(Object, Callback) set read interest} on the {@link EndPoint},
* and when the {@link EndPoint} signals read readyness, this {@link Connection} can
* read bytes from the network and interpret them.</p>
*/
public interface Connection
public interface Connection extends AutoCloseable
{
/**
* <p>Callback method invoked when this {@link Connection} is opened.</p>
* <p>Creators of the connection implementation are responsible for calling this method.</p>
*/
void onOpen();
public void onOpen();
/**
* <p>Callback method invoked when this {@link Connection} is closed.</p>
* <p>Creators of the connection implementation are responsible for calling this method.</p>
*/
void onClose();
public void onClose();
/**
* @return the {@link EndPoint} associated with this {@link Connection}
*/
EndPoint getEndPoint();
public EndPoint getEndPoint();
/**
* <p>Performs a logical close of this connection.</p>
* <p>For simple connections, this may just mean to delegate the close to the associated
* {@link EndPoint} but, for example, SSL connections should write the SSL close message
* before closing the associated {@link EndPoint}.</p>
*/
public void close();
}

View File

@ -80,9 +80,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private ManagedSelector chooseSelector()
{
// The ++ increment here is not atomic, but it does not matter.
// The ++ increment here is not atomic, but it does not matter,
// so long as the value changes sometimes, then connections will
// be distributed over the available sets.
// be distributed over the available selectors.
long s = _selectorIndex++;
int index = (int)(s % getSelectorCount());
return _selectors[index];
@ -111,8 +111,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
public void accept(final SocketChannel channel)
{
final ManagedSelector set = chooseSelector();
set.submit(set.new Accept(channel));
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel));
}
@Override
@ -121,10 +121,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
super.doStart();
for (int i = 0; i < _selectors.length; i++)
{
ManagedSelector selectSet = newSelector(i);
_selectors[i] = selectSet;
selectSet.start();
execute(selectSet);
ManagedSelector selector = newSelector(i);
_selectors[i] = selector;
selector.start();
execute(selector);
}
}
@ -142,8 +142,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
@Override
protected void doStop() throws Exception
{
for (ManagedSelector set : _selectors)
set.stop();
for (ManagedSelector selector : _selectors)
selector.stop();
super.doStop();
}
@ -172,7 +172,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just opened
*/
public void connectionOpened(final Connection connection)
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@ -182,7 +182,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just closed
*/
public void connectionClosed(final Connection connection)
public void connectionClosed(Connection connection)
{
connection.onClose();
}
@ -659,8 +659,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
{
EndPoint endpoint = (EndPoint)attachment;
endpoint.close();
EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
execute(closer);
// We are closing the SelectorManager, so we want to block the
// selector thread here until we have closed all EndPoints.
// This is different than calling close() directly, because close()
// can wait forever, while here we are limited by the stop timeout.
closer.await(getStopTimeout());
}
}
@ -684,6 +689,42 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
}
private class EndPointCloser implements Runnable
{
private final CountDownLatch latch = new CountDownLatch(1);
private final EndPoint endPoint;
private EndPointCloser(EndPoint endPoint)
{
this.endPoint = endPoint;
}
@Override
public void run()
{
try
{
endPoint.getConnection().close();
}
finally
{
latch.countDown();
}
}
private boolean await(long timeout)
{
try
{
return latch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
return false;
}
}
}
}
/**

View File

@ -105,6 +105,12 @@ public class SPDYConnection extends AbstractConnection implements Controller<Sta
return remaining - buffer.remaining();
}
@Override
public void close()
{
goAway(session);
}
@Override
public void close(boolean onlyOutput)
{