jetty-9 - Throwing EofException when trying to flush after output shutdown.
When the output is shutdown, field _cannotAcceptMoreAppDataToFlush is set to true, indicating that no more flush() calls are accepted. However, a further call to flush() was reading _cannotAcceptMoreAppDataToFlush and just returning false, leaving writes in pending state but without chance to be completed. Now, if we detect _cannotAcceptMoreAppDataToFlush==true in flush(), we check whether the output was shutdown; if it is the case, we throw an EofException to signal to the application that it cannot write because the connection is already closed.
This commit is contained in:
parent
655a313fdf
commit
8c9f097666
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.io.ssl;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
|
@ -79,6 +80,7 @@ import org.eclipse.jetty.util.log.Logger;
|
||||||
public class SslConnection extends AbstractConnection
|
public class SslConnection extends AbstractConnection
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(SslConnection.class);
|
private static final Logger LOG = Log.getLogger(SslConnection.class);
|
||||||
|
private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
|
||||||
private static final ByteBuffer __FILL_CALLED_FLUSH= BufferUtil.allocate(0);
|
private static final ByteBuffer __FILL_CALLED_FLUSH= BufferUtil.allocate(0);
|
||||||
private static final ByteBuffer __FLUSH_CALLED_FILL= BufferUtil.allocate(0);
|
private static final ByteBuffer __FLUSH_CALLED_FILL= BufferUtil.allocate(0);
|
||||||
private final ByteBufferPool _bufferPool;
|
private final ByteBufferPool _bufferPool;
|
||||||
|
@ -189,6 +191,7 @@ public class SslConnection extends AbstractConnection
|
||||||
// to do the fill and/or flush again and these calls will do the actually
|
// to do the fill and/or flush again and these calls will do the actually
|
||||||
// filling.
|
// filling.
|
||||||
|
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("onFillable enter {}", getEndPoint());
|
LOG.debug("onFillable enter {}", getEndPoint());
|
||||||
|
|
||||||
// wake up whoever is doing the fill or the flush so they can
|
// wake up whoever is doing the fill or the flush so they can
|
||||||
|
@ -205,6 +208,7 @@ public class SslConnection extends AbstractConnection
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("onFillable exit {}", getEndPoint());
|
LOG.debug("onFillable exit {}", getEndPoint());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,6 +272,7 @@ public class SslConnection extends AbstractConnection
|
||||||
// data. In either case the appropriate callback is passed on.
|
// data. In either case the appropriate callback is passed on.
|
||||||
synchronized (DecryptedEndPoint.this)
|
synchronized (DecryptedEndPoint.this)
|
||||||
{
|
{
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("write.complete {}", SslConnection.this.getEndPoint());
|
LOG.debug("write.complete {}", SslConnection.this.getEndPoint());
|
||||||
|
|
||||||
releaseEncryptedOutputBuffer();
|
releaseEncryptedOutputBuffer();
|
||||||
|
@ -292,6 +297,7 @@ public class SslConnection extends AbstractConnection
|
||||||
// data. In either case the appropriate callback is passed on.
|
// data. In either case the appropriate callback is passed on.
|
||||||
synchronized (DecryptedEndPoint.this)
|
synchronized (DecryptedEndPoint.this)
|
||||||
{
|
{
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} write.failed", SslConnection.this, x);
|
LOG.debug("{} write.failed", SslConnection.this, x);
|
||||||
if (_encryptedOutput != null)
|
if (_encryptedOutput != null)
|
||||||
BufferUtil.clear(_encryptedOutput);
|
BufferUtil.clear(_encryptedOutput);
|
||||||
|
@ -345,6 +351,7 @@ public class SslConnection extends AbstractConnection
|
||||||
// if neither then we should just try the flush again.
|
// if neither then we should just try the flush again.
|
||||||
synchronized (DecryptedEndPoint.this)
|
synchronized (DecryptedEndPoint.this)
|
||||||
{
|
{
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("onIncompleteFlush {}", getEndPoint());
|
LOG.debug("onIncompleteFlush {}", getEndPoint());
|
||||||
// If we have pending output data,
|
// If we have pending output data,
|
||||||
if (BufferUtil.hasContent(_encryptedOutput))
|
if (BufferUtil.hasContent(_encryptedOutput))
|
||||||
|
@ -447,6 +454,7 @@ public class SslConnection extends AbstractConnection
|
||||||
@Override
|
@Override
|
||||||
public synchronized int fill(ByteBuffer buffer) throws IOException
|
public synchronized int fill(ByteBuffer buffer) throws IOException
|
||||||
{
|
{
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} fill enter", SslConnection.this);
|
LOG.debug("{} fill enter", SslConnection.this);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -474,6 +482,7 @@ public class SslConnection extends AbstractConnection
|
||||||
{
|
{
|
||||||
// Let's try reading some encrypted data... even if we have some already.
|
// Let's try reading some encrypted data... even if we have some already.
|
||||||
int net_filled = getEndPoint().fill(_encryptedInput);
|
int net_filled = getEndPoint().fill(_encryptedInput);
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} filled {} encrypted bytes", SslConnection.this, net_filled);
|
LOG.debug("{} filled {} encrypted bytes", SslConnection.this, net_filled);
|
||||||
if (net_filled > 0)
|
if (net_filled > 0)
|
||||||
_underFlown = false;
|
_underFlown = false;
|
||||||
|
@ -483,6 +492,7 @@ public class SslConnection extends AbstractConnection
|
||||||
int pos = BufferUtil.flipToFill(app_in);
|
int pos = BufferUtil.flipToFill(app_in);
|
||||||
SSLEngineResult unwrapResult = _sslEngine.unwrap(_encryptedInput, app_in);
|
SSLEngineResult unwrapResult = _sslEngine.unwrap(_encryptedInput, app_in);
|
||||||
BufferUtil.flipToFlush(app_in, pos);
|
BufferUtil.flipToFlush(app_in, pos);
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} unwrap {}", SslConnection.this, unwrapResult);
|
LOG.debug("{} unwrap {}", SslConnection.this, unwrapResult);
|
||||||
|
|
||||||
// and deal with the results
|
// and deal with the results
|
||||||
|
@ -614,7 +624,6 @@ public class SslConnection extends AbstractConnection
|
||||||
catch (SSLException e)
|
catch (SSLException e)
|
||||||
{
|
{
|
||||||
getEndPoint().close();
|
getEndPoint().close();
|
||||||
LOG.debug(e);
|
|
||||||
throw new EofException(e);
|
throw new EofException(e);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
@ -641,6 +650,7 @@ public class SslConnection extends AbstractConnection
|
||||||
_bufferPool.release(_decryptedInput);
|
_bufferPool.release(_decryptedInput);
|
||||||
_decryptedInput = null;
|
_decryptedInput = null;
|
||||||
}
|
}
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} fill exit", SslConnection.this);
|
LOG.debug("{} fill exit", SslConnection.this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -655,12 +665,17 @@ public class SslConnection extends AbstractConnection
|
||||||
// it is the applications responsibility to call flush again - either in a busy loop
|
// it is the applications responsibility to call flush again - either in a busy loop
|
||||||
// or better yet by using EndPoint#write to do the flushing.
|
// or better yet by using EndPoint#write to do the flushing.
|
||||||
|
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} flush enter {}", SslConnection.this, Arrays.toString(appOuts));
|
LOG.debug("{} flush enter {}", SslConnection.this, Arrays.toString(appOuts));
|
||||||
int consumed=0;
|
int consumed=0;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (_cannotAcceptMoreAppDataToFlush)
|
if (_cannotAcceptMoreAppDataToFlush)
|
||||||
|
{
|
||||||
|
if (_sslEngine.isOutboundDone())
|
||||||
|
throw new EofException(new ClosedChannelException());
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// We will need a network buffer
|
// We will need a network buffer
|
||||||
if (_encryptedOutput == null)
|
if (_encryptedOutput == null)
|
||||||
|
@ -673,6 +688,7 @@ public class SslConnection extends AbstractConnection
|
||||||
BufferUtil.compact(_encryptedOutput);
|
BufferUtil.compact(_encryptedOutput);
|
||||||
int pos = BufferUtil.flipToFill(_encryptedOutput);
|
int pos = BufferUtil.flipToFill(_encryptedOutput);
|
||||||
SSLEngineResult wrapResult = _sslEngine.wrap(appOuts, _encryptedOutput);
|
SSLEngineResult wrapResult = _sslEngine.wrap(appOuts, _encryptedOutput);
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} wrap {}", SslConnection.this, wrapResult);
|
LOG.debug("{} wrap {}", SslConnection.this, wrapResult);
|
||||||
BufferUtil.flipToFlush(_encryptedOutput, pos);
|
BufferUtil.flipToFlush(_encryptedOutput, pos);
|
||||||
if (wrapResult.bytesConsumed()>0)
|
if (wrapResult.bytesConsumed()>0)
|
||||||
|
@ -711,7 +727,7 @@ public class SslConnection extends AbstractConnection
|
||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
|
|
||||||
default:
|
default:
|
||||||
if (LOG.isDebugEnabled())
|
if (DEBUG)
|
||||||
LOG.debug("{} {} {}", this, wrapResult.getStatus(), BufferUtil.toDetailString(_encryptedOutput));
|
LOG.debug("{} {} {}", this, wrapResult.getStatus(), BufferUtil.toDetailString(_encryptedOutput));
|
||||||
|
|
||||||
// if we have net bytes, let's try to flush them
|
// if we have net bytes, let's try to flush them
|
||||||
|
@ -759,6 +775,7 @@ public class SslConnection extends AbstractConnection
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
if (DEBUG)
|
||||||
LOG.debug("{} flush exit, consumed {}", SslConnection.this, consumed);
|
LOG.debug("{} flush exit, consumed {}", SslConnection.this, consumed);
|
||||||
releaseEncryptedOutputBuffer();
|
releaseEncryptedOutputBuffer();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue