Issue #1146 DecryptedEndPoint deadlock

This commit is contained in:
Greg Wilkins 2016-12-07 22:50:27 +11:00
parent ec0b1ea847
commit af5d27c2f7
6 changed files with 114 additions and 51 deletions

View File

@ -142,6 +142,16 @@ public abstract class AbstractConnection implements Connection
getEndPoint().fillInterested(_readCallback); getEndPoint().fillInterested(_readCallback);
} }
public void tryFillInterested()
{
tryFillInterested(_readCallback);
}
public void tryFillInterested(Callback callback)
{
getEndPoint().tryFillInterested(callback);
}
public boolean isFillInterested() public boolean isFillInterested()
{ {
return getEndPoint().isFillInterested(); return getEndPoint().isFillInterested();

View File

@ -122,12 +122,19 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
} }
@Override @Override
public void fillInterested(Callback callback) throws IllegalStateException public void fillInterested(Callback callback)
{ {
notIdle(); notIdle();
_fillInterest.register(callback); _fillInterest.register(callback);
} }
@Override
public boolean tryFillInterested(Callback callback)
{
notIdle();
return _fillInterest.tryRegister(callback);
}
@Override @Override
public boolean isFillInterested() public boolean isFillInterested()
{ {

View File

@ -205,6 +205,14 @@ public interface EndPoint extends Closeable
*/ */
void fillInterested(Callback callback) throws ReadPendingException; void fillInterested(Callback callback) throws ReadPendingException;
/**
* <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
*
* @param callback the callback to call when an error occurs or we are readable.
* @return true if set
*/
boolean tryFillInterested(Callback callback);
/** /**
* @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet * @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet
* been called * been called

View File

@ -53,24 +53,37 @@ public abstract class FillInterest
*/ */
public void register(Callback callback) throws ReadPendingException public void register(Callback callback) throws ReadPendingException
{ {
if (callback == null) if (!tryRegister(callback))
throw new IllegalArgumentException();
if (_interested.compareAndSet(null, callback))
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
}
}
else
{ {
LOG.warn("Read pending for {} prevented {}", _interested, callback); LOG.warn("Read pending for {} prevented {}", _interested, callback);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.warn("callback set at ",_lastSet); LOG.warn("callback set at ",_lastSet);
throw new ReadPendingException(); throw new ReadPendingException();
}
}
/**
* Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()}
* returns true or eventually once {@link #fillable()} is called.
*
* @param callback the callback to register
* @return true if the register succeeded
*/
public boolean tryRegister(Callback callback)
{
if (callback == null)
throw new IllegalArgumentException();
if (!_interested.compareAndSet(null, callback))
return false;
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
} }
try try
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -81,6 +94,8 @@ public abstract class FillInterest
{ {
onFail(e); onFail(e);
} }
return true;
} }
/** /**

View File

@ -223,14 +223,17 @@ public class SslConnection extends AbstractConnection
_decryptedEndPoint.getFillInterest().fillable(); _decryptedEndPoint.getFillInterest().fillable();
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
boolean runComplete = false;
synchronized(_decryptedEndPoint) synchronized(_decryptedEndPoint)
{ {
if (_decryptedEndPoint._flushRequiresFillToProgress) if (_decryptedEndPoint._flushRequiresFillToProgress)
{ {
_decryptedEndPoint._flushRequiresFillToProgress = false; _decryptedEndPoint._flushRequiresFillToProgress = false;
_runCompletWrite.run(); runComplete = true;
} }
} }
if (runComplete)
_runCompletWrite.run();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onFillable exit {}", _decryptedEndPoint); LOG.debug("onFillable exit {}", _decryptedEndPoint);
@ -390,6 +393,8 @@ public class SslConnection extends AbstractConnection
// OR if we are handshaking we need to read some encrypted data OR // OR if we are handshaking we need to read some encrypted data OR
// if neither then we should just try the flush again. // if neither then we should just try the flush again.
boolean try_again = false; boolean try_again = false;
boolean write = false;
boolean need_fill_interest = false;
synchronized (DecryptedEndPoint.this) synchronized (DecryptedEndPoint.this)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -399,15 +404,14 @@ public class SslConnection extends AbstractConnection
{ {
// write it // write it
_cannotAcceptMoreAppDataToFlush = true; _cannotAcceptMoreAppDataToFlush = true;
getEndPoint().write(_writeCallback, _encryptedOutput); write = true;
} }
// If we are handshaking and need to read, // If we are handshaking and need to read,
else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP)
{ {
// check if we are actually read blocked in order to write // check if we are actually read blocked in order to write
_flushRequiresFillToProgress = true; _flushRequiresFillToProgress = true;
need_fill_interest = !SslConnection.this.isFillInterested();
ensureFillInterested();
} }
else else
{ {
@ -420,8 +424,11 @@ public class SslConnection extends AbstractConnection
} }
} }
if (write)
if (try_again) getEndPoint().write(_writeCallback, _encryptedOutput);
else if (need_fill_interest)
ensureFillInterested();
else if (try_again)
{ {
// If the output is closed, // If the output is closed,
if (isOutputShutdown()) if (isOutputShutdown())
@ -437,6 +444,7 @@ public class SslConnection extends AbstractConnection
getExecutor().execute(_runCompletWrite); getExecutor().execute(_runCompletWrite);
} }
} }
} }
@Override @Override
@ -446,11 +454,12 @@ public class SslConnection extends AbstractConnection
// method on the DecryptedEndPoint, so we have to work out if there is // method on the DecryptedEndPoint, so we have to work out if there is
// decrypted data to be filled or what callbacks to setup to be told when there // decrypted data to be filled or what callbacks to setup to be told when there
// might be more encrypted data available to attempt another call to fill // might be more encrypted data available to attempt another call to fill
boolean fillable;
boolean write = false;
synchronized (DecryptedEndPoint.this) synchronized (DecryptedEndPoint.this)
{ {
// Do we already have some app data, then app can fill now so return true // Do we already have some app data, then app can fill now so return true
boolean fillable = (BufferUtil.hasContent(_decryptedInput)) fillable = (BufferUtil.hasContent(_decryptedInput))
// or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill // or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill
|| BufferUtil.hasContent(_encryptedInput) && !_underFlown; || BufferUtil.hasContent(_encryptedInput) && !_underFlown;
@ -469,7 +478,7 @@ public class SslConnection extends AbstractConnection
{ {
// write it // write it
_cannotAcceptMoreAppDataToFlush = true; _cannotAcceptMoreAppDataToFlush = true;
getEndPoint().write(_writeCallback, _encryptedOutput); write = true;
} }
else else
{ {
@ -480,12 +489,13 @@ public class SslConnection extends AbstractConnection
} }
} }
} }
if (fillable)
getExecutor().execute(_runFillable);
else
ensureFillInterested();
} }
if (write)
getEndPoint().write(_writeCallback, _encryptedOutput);
else if (fillable)
getExecutor().execute(_runFillable);
else
ensureFillInterested();
} }
@Override @Override
@ -892,7 +902,7 @@ public class SslConnection extends AbstractConnection
_flushRequiresFillToProgress = true; _flushRequiresFillToProgress = true;
fill(__FLUSH_CALLED_FILL); fill(__FLUSH_CALLED_FILL);
// Check if after the fill() we need to wrap again // Check if after the fill() we need to wrap again
if (handshakeStatus == HandshakeStatus.NEED_WRAP) if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
continue; continue;
} }
return allConsumed && BufferUtil.isEmpty(_encryptedOutput); return allConsumed && BufferUtil.isEmpty(_encryptedOutput);
@ -937,27 +947,37 @@ public class SslConnection extends AbstractConnection
{ {
try try
{ {
synchronized (this) boolean flush = false;
boolean close = false;
synchronized (_decryptedEndPoint)
{ {
boolean ishut = isInputShutdown(); boolean ishut = isInputShutdown();
boolean oshut = isOutputShutdown(); boolean oshut = isOutputShutdown();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut); LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut);
if (!oshut) if (oshut)
return;
if (!_closedOutbound)
{ {
if (!_closedOutbound) _closedOutbound=true; // Only attempt this once
{ _sslEngine.closeOutbound();
_closedOutbound=true; // Only attempt this once flush = true;
_sslEngine.closeOutbound();
flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message.
}
if (ishut)
getEndPoint().close();
else
ensureFillInterested();
} }
// TODO review close logic here
if (ishut)
close = true;
} }
if (flush)
flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message.
if (close)
getEndPoint().close();
else
ensureFillInterested();
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -968,16 +988,13 @@ public class SslConnection extends AbstractConnection
private void ensureFillInterested() private void ensureFillInterested()
{ {
if (!SslConnection.this.isFillInterested()) if (getFillInterest().isCallbackNonBlocking())
{ {
if (getFillInterest().isCallbackNonBlocking()) SslConnection.this.tryFillInterested(_nonBlockingReadCallback);
{ }
SslConnection.this.getEndPoint().fillInterested(_nonBlockingReadCallback); else
} {
else SslConnection.this.tryFillInterested();
{
SslConnection.this.fillInterested();
}
} }
} }
@ -1058,4 +1075,5 @@ public class SslConnection extends AbstractConnection
return super.toString()+"->"+getEndPoint().toString(); return super.toString()+"->"+getEndPoint().toString();
} }
} }
} }

View File

@ -303,6 +303,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_endp.fillInterested(callback); _endp.fillInterested(callback);
} }
public boolean tryFillInterested(Callback callback)
{
return _endp.tryFillInterested(callback);
}
@Override @Override
public boolean isFillInterested() public boolean isFillInterested()
{ {