Merge remote-tracking branch 'origin/jetty-9.3.x' into jetty-9.4.x

This commit is contained in:
Greg Wilkins 2016-12-08 18:35:40 +11:00
commit 5186c1e6d0
7 changed files with 112 additions and 49 deletions

View File

@ -134,6 +134,16 @@ public abstract class AbstractConnection implements Connection
getEndPoint().fillInterested(_readCallback);
}
public void tryFillInterested()
{
tryFillInterested(_readCallback);
}
public void tryFillInterested(Callback callback)
{
getEndPoint().tryFillInterested(callback);
}
public boolean isFillInterested()
{
return getEndPoint().isFillInterested();

View File

@ -347,12 +347,19 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
}
@Override
public void fillInterested(Callback callback) throws IllegalStateException
public void fillInterested(Callback callback)
{
notIdle();
_fillInterest.register(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
notIdle();
return _fillInterest.tryRegister(callback);
}
@Override
public boolean isFillInterested()
{

View File

@ -205,6 +205,14 @@ public interface EndPoint extends Closeable
*/
void fillInterested(Callback callback) throws ReadPendingException;
/**
* <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
*
* @param callback the callback to call when an error occurs or we are readable.
* @return true if set
*/
boolean tryFillInterested(Callback callback);
/**
* @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet
* been called

View File

@ -55,24 +55,37 @@ public abstract class FillInterest
*/
public void register(Callback callback) throws ReadPendingException
{
if (callback == null)
throw new IllegalArgumentException();
if (_interested.compareAndSet(null, callback))
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
}
}
else
if (!tryRegister(callback))
{
LOG.warn("Read pending for {} prevented {}", _interested, callback);
if (LOG.isDebugEnabled())
LOG.warn("callback set at ",_lastSet);
throw new ReadPendingException();
}
}
/**
* Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()}
* returns true or eventually once {@link #fillable()} is called.
*
* @param callback the callback to register
* @return true if the register succeeded
*/
public boolean tryRegister(Callback callback)
{
if (callback == null)
throw new IllegalArgumentException();
if (!_interested.compareAndSet(null, callback))
return false;
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
}
try
{
if (LOG.isDebugEnabled())
@ -83,6 +96,8 @@ public abstract class FillInterest
{
onFail(e);
}
return true;
}
/**

View File

@ -261,14 +261,17 @@ public class SslConnection extends AbstractConnection
_decryptedEndPoint.getFillInterest().fillable();
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
boolean runComplete = false;
synchronized(_decryptedEndPoint)
{
if (_decryptedEndPoint._flushRequiresFillToProgress)
{
_decryptedEndPoint._flushRequiresFillToProgress = false;
_runCompleteWrite.run();
runComplete = true;
}
}
if (runComplete)
_runCompleteWrite.run();
if (LOG.isDebugEnabled())
LOG.debug("onFillable exit {}", _decryptedEndPoint);
@ -455,6 +458,8 @@ public class SslConnection extends AbstractConnection
// OR if we are handshaking we need to read some encrypted data OR
// if neither then we should just try the flush again.
boolean try_again = false;
boolean write = false;
boolean need_fill_interest = false;
synchronized (DecryptedEndPoint.this)
{
if (LOG.isDebugEnabled())
@ -464,15 +469,14 @@ public class SslConnection extends AbstractConnection
{
// write it
_cannotAcceptMoreAppDataToFlush = true;
getEndPoint().write(_writeCallback, _encryptedOutput);
write = true;
}
// If we are handshaking and need to read,
else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP)
{
// check if we are actually read blocked in order to write
_flushRequiresFillToProgress = true;
ensureFillInterested();
_flushRequiresFillToProgress = true;
need_fill_interest = !SslConnection.this.isFillInterested();
}
else
{
@ -485,8 +489,11 @@ public class SslConnection extends AbstractConnection
}
}
if (try_again)
if (write)
getEndPoint().write(_writeCallback, _encryptedOutput);
else if (need_fill_interest)
ensureFillInterested();
else if (try_again)
{
// If the output is closed,
if (isOutputShutdown())
@ -502,6 +509,7 @@ public class SslConnection extends AbstractConnection
getExecutor().execute(_runCompleteWrite);
}
}
}
@Override
@ -511,11 +519,12 @@ public class SslConnection extends AbstractConnection
// method on the DecryptedEndPoint, so we have to work out if there is
// decrypted data to be filled or what callbacks to setup to be told when there
// might be more encrypted data available to attempt another call to fill
boolean fillable;
boolean write = false;
synchronized (DecryptedEndPoint.this)
{
// Do we already have some app data, then app can fill now so return true
boolean fillable = (BufferUtil.hasContent(_decryptedInput))
fillable = (BufferUtil.hasContent(_decryptedInput))
// or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill
|| BufferUtil.hasContent(_encryptedInput) && !_underFlown;
@ -534,7 +543,7 @@ public class SslConnection extends AbstractConnection
{
// write it
_cannotAcceptMoreAppDataToFlush = true;
getEndPoint().write(_writeCallback, _encryptedOutput);
write = true;
}
else
{
@ -545,12 +554,13 @@ public class SslConnection extends AbstractConnection
}
}
}
if (fillable)
getExecutor().execute(_runFillable);
else
ensureFillInterested();
}
if (write)
getEndPoint().write(_writeCallback, _encryptedOutput);
else if (fillable)
getExecutor().execute(_runFillable);
else
ensureFillInterested();
}
@Override
@ -957,7 +967,7 @@ public class SslConnection extends AbstractConnection
_flushRequiresFillToProgress = true;
fill(__FLUSH_CALLED_FILL);
// Check if after the fill() we need to wrap again
if (handshakeStatus == HandshakeStatus.NEED_WRAP)
if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
continue;
}
return allConsumed && BufferUtil.isEmpty(_encryptedOutput);
@ -1002,27 +1012,37 @@ public class SslConnection extends AbstractConnection
{
try
{
synchronized (this)
boolean flush = false;
boolean close = false;
synchronized (_decryptedEndPoint)
{
boolean ishut = isInputShutdown();
boolean oshut = isOutputShutdown();
if (LOG.isDebugEnabled())
LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut);
if (!oshut)
if (oshut)
return;
if (!_closedOutbound)
{
if (!_closedOutbound)
{
_closedOutbound=true; // Only attempt this once
_sslEngine.closeOutbound();
flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message.
}
if (ishut)
getEndPoint().close();
else
ensureFillInterested();
_closedOutbound=true; // Only attempt this once
_sslEngine.closeOutbound();
flush = true;
}
// TODO review close logic here
if (ishut)
close = true;
}
if (flush)
flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message.
if (close)
getEndPoint().close();
else
ensureFillInterested();
}
catch (Throwable x)
{
@ -1033,12 +1053,9 @@ public class SslConnection extends AbstractConnection
private void ensureFillInterested()
{
if (!SslConnection.this.isFillInterested())
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested SSL NB {}",SslConnection.this);
SslConnection.this.getEndPoint().fillInterested(_sslReadCallback);
}
if (LOG.isDebugEnabled())
LOG.debug("fillInterested SSL NB {}",SslConnection.this);
SslConnection.this.tryFillInterested(_sslReadCallback);
}
@Override
@ -1110,4 +1127,5 @@ public class SslConnection extends AbstractConnection
return super.toString()+"->"+getEndPoint().toString();
}
}
}

View File

@ -662,6 +662,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_endp.fillInterested(callback);
}
public boolean tryFillInterested(Callback callback)
{
return _endp.tryFillInterested(callback);
}
@Override
public boolean isFillInterested()
{

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
/**
* Wrapper for a {@link ServerEndpointConfig} where there PathParm information from the incoming request.
* Wrapper for a {@link ServerEndpointConfig} where there is PathParam information from the incoming request.
*/
public class PathParamServerEndpointConfig extends BasicServerEndpointConfig implements ServerEndpointConfig
{