Jetty 9.4.x 3038 ssl connection leak (#3121)

Issue #3038 - SSL connection leak.

Fixed SSL spin caused when fill had NEED_WRAP, but a flush/wrap
produced 0 bytes and stayed in NEED_WRAP

Removed check of isInputShutdown prior to filling that allowed EOF to
overtake data already read.

Fix for leak by shutting down output in HttpConnection if
filled -1 and the HttpChannelState was no longer processing
current request.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2018-11-20 13:53:42 +01:00 committed by GitHub
parent 10622f3455
commit 8c4ee8496f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 507 additions and 168 deletions

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client.http;
import java.io.EOFException; import java.io.EOFException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
@ -39,6 +40,7 @@ import org.eclipse.jetty.util.CompletableCallback;
public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler
{ {
private final AtomicReference<ContentState> handlingContent = new AtomicReference<>(ContentState.IDLE);
private final HttpParser parser; private final HttpParser parser;
private ByteBuffer buffer; private ByteBuffer buffer;
private boolean shutdown; private boolean shutdown;
@ -263,8 +265,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null) if (exchange == null)
return false; return false;
handlingContent.set(ContentState.CONTENT);
CompletableCallback callback = new CompletableCallback() CompletableCallback callback = new CompletableCallback()
{ {
@Override
public void succeeded()
{
boolean messageComplete = !handlingContent.compareAndSet(ContentState.CONTENT, ContentState.IDLE);
super.succeeded();
if (messageComplete)
messageComplete();
}
@Override @Override
public void resume() public void resume()
{ {
@ -304,6 +316,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
@Override @Override
public boolean messageComplete() public boolean messageComplete()
{ {
if (handlingContent.compareAndSet(ContentState.CONTENT, ContentState.COMPLETE))
return false;
HttpExchange exchange = getHttpExchange(); HttpExchange exchange = getHttpExchange();
if (exchange == null) if (exchange == null)
return false; return false;
@ -375,4 +390,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{ {
return String.format("%s[%s]", super.toString(), parser); return String.format("%s[%s]", super.toString(), parser);
} }
private enum ContentState { IDLE, CONTENT, COMPLETE }
} }

View File

@ -767,7 +767,7 @@ public class HttpParser
case LF: case LF:
setState(State.HEADER); setState(State.HEADER);
handle=_responseHandler.startResponse(_version, _responseStatus, null)||handle; handle |= _responseHandler.startResponse(_version, _responseStatus, null);
break; break;
default: default:
@ -789,7 +789,7 @@ public class HttpParser
handle=_requestHandler.startRequest(_methodString,_uri.toString(), HttpVersion.HTTP_0_9); handle=_requestHandler.startRequest(_methodString,_uri.toString(), HttpVersion.HTTP_0_9);
setState(State.END); setState(State.END);
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
handle= handleHeaderContentMessage() || handle; handle |= handleHeaderContentMessage();
break; break;
case ALPHA: case ALPHA:
@ -865,7 +865,7 @@ public class HttpParser
if (_responseHandler!=null) if (_responseHandler!=null)
{ {
setState(State.HEADER); setState(State.HEADER);
handle=_responseHandler.startResponse(_version, _responseStatus, null)||handle; handle |= _responseHandler.startResponse(_version, _responseStatus, null);
} }
else else
{ {
@ -876,7 +876,7 @@ public class HttpParser
handle=_requestHandler.startRequest(_methodString,_uri.toString(), HttpVersion.HTTP_0_9); handle=_requestHandler.startRequest(_methodString,_uri.toString(), HttpVersion.HTTP_0_9);
setState(State.END); setState(State.END);
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
handle= handleHeaderContentMessage() || handle; handle |= handleHeaderContentMessage();
} }
break; break;
@ -905,7 +905,7 @@ public class HttpParser
setState(State.HEADER); setState(State.HEADER);
handle=_requestHandler.startRequest(_methodString,_uri.toString(), _version)||handle; handle |= _requestHandler.startRequest(_methodString,_uri.toString(), _version);
continue; continue;
case ALPHA: case ALPHA:
@ -927,7 +927,7 @@ public class HttpParser
case LF: case LF:
String reason=takeString(); String reason=takeString();
setState(State.HEADER); setState(State.HEADER);
handle=_responseHandler.startResponse(_version, _responseStatus, reason)||handle; handle |= _responseHandler.startResponse(_version, _responseStatus, reason);
continue; continue;
case ALPHA: case ALPHA:
@ -1660,14 +1660,16 @@ public class HttpParser
_contentPosition += _contentChunk.remaining(); _contentPosition += _contentChunk.remaining();
buffer.position(buffer.position()+_contentChunk.remaining()); buffer.position(buffer.position()+_contentChunk.remaining());
if (_handler.content(_contentChunk)) boolean handle = _handler.content(_contentChunk);
return true;
if(_contentPosition == _contentLength) if(_contentPosition == _contentLength)
{ {
setState(State.END); setState(State.END);
return handleContentMessage(); boolean handleContent = handleContentMessage();
return handle || handleContent;
} }
else if (handle)
return true;
} }
break; break;
} }
@ -1808,7 +1810,6 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public boolean isAtEOF() public boolean isAtEOF()
{ {
return _eof; return _eof;
} }

View File

@ -27,7 +27,6 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -110,13 +109,28 @@ public class HttpTester
public static Response parseResponse(InputStream responseStream) throws IOException public static Response parseResponse(InputStream responseStream) throws IOException
{ {
ByteArrayOutputStream contentStream = new ByteArrayOutputStream();
IO.copy(responseStream, contentStream);
Response r=new Response(); Response r=new Response();
HttpParser parser =new HttpParser(r); HttpParser parser =new HttpParser(r);
parser.parseNext(ByteBuffer.wrap(contentStream.toByteArray()));
// Read and parse a character at a time so we never can read more than we should.
byte[] array = new byte[1];
ByteBuffer buffer = ByteBuffer.wrap(array);
buffer.limit(1);
while(true)
{
buffer.position(1);
int l = responseStream.read(array);
if (l<0)
parser.atEOF();
else
buffer.position(0);
if (parser.parseNext(buffer))
return r; return r;
else if (l<0)
return null;
}
} }
public abstract static class Input public abstract static class Input

View File

@ -126,13 +126,13 @@ public class SslConnection extends AbstractConnection
protected RunnableTask(String op) protected RunnableTask(String op)
{ {
_operation=op; _operation = op;
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("SSL:%s:%s:%s",SslConnection.this,_operation,getInvocationType()); return String.format("SSL:%s:%s:%s", SslConnection.this, _operation, getInvocationType());
} }
} }
@ -174,7 +174,7 @@ public class SslConnection extends AbstractConnection
@Override @Override
public String toString() public String toString()
{ {
return String.format("SSLC.NBReadCB@%x{%s}", SslConnection.this.hashCode(),SslConnection.this); return String.format("SSLC.NBReadCB@%x{%s}", SslConnection.this.hashCode(), SslConnection.this);
} }
}; };
@ -311,26 +311,26 @@ public class SslConnection extends AbstractConnection
@Override @Override
public void onFillInterestedFailed(Throwable cause) public void onFillInterestedFailed(Throwable cause)
{ {
_decryptedEndPoint.onFillableFail(cause==null?new IOException():cause); _decryptedEndPoint.onFillableFail(cause == null ? new IOException() : cause);
} }
@Override @Override
public String toConnectionString() public String toConnectionString()
{ {
ByteBuffer b = _encryptedInput; ByteBuffer b = _encryptedInput;
int ei=b==null?-1:b.remaining(); int ei = b == null ? -1 : b.remaining();
b = _encryptedOutput; b = _encryptedOutput;
int eo=b==null?-1:b.remaining(); int eo = b == null ? -1 : b.remaining();
b = _decryptedInput; b = _decryptedInput;
int di=b==null?-1:b.remaining(); int di = b == null ? -1 : b.remaining();
Connection connection = _decryptedEndPoint.getConnection(); Connection connection = _decryptedEndPoint.getConnection();
return String.format("%s@%x{%s,eio=%d/%d,di=%d,fill=%s,flush=%s}~>%s=>%s", return String.format("%s@%x{%s,eio=%d/%d,di=%d,fill=%s,flush=%s}~>%s=>%s",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
_sslEngine.getHandshakeStatus(), _sslEngine.getHandshakeStatus(),
ei,eo,di, ei, eo, di,
_fillState,_flushState, _fillState, _flushState,
_decryptedEndPoint.toEndPointString(), _decryptedEndPoint.toEndPointString(),
connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection); connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection);
} }
@ -399,22 +399,22 @@ public class SslConnection extends AbstractConnection
{ {
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
boolean waiting_for_fill; boolean waiting_for_fill;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onFillable {}", SslConnection.this); LOG.debug("onFillable {}", SslConnection.this);
_fillState = FillState.IDLE; _fillState = FillState.IDLE;
waiting_for_fill = _flushState==FlushState.WAIT_FOR_FILL; waiting_for_fill = _flushState == FlushState.WAIT_FOR_FILL;
} }
getFillInterest().fillable(); getFillInterest().fillable();
if (waiting_for_fill) if (waiting_for_fill)
{ {
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
waiting_for_fill = _flushState==FlushState.WAIT_FOR_FILL; waiting_for_fill = _flushState == FlushState.WAIT_FOR_FILL;
} }
if (waiting_for_fill) if (waiting_for_fill)
fill(BufferUtil.EMPTY_BUFFER); fill(BufferUtil.EMPTY_BUFFER);
@ -430,13 +430,13 @@ public class SslConnection extends AbstractConnection
{ {
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
boolean fail = false; boolean fail = false;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onFillableFail {}", SslConnection.this, failure); LOG.debug("onFillableFail {}", SslConnection.this, failure);
_fillState = FillState.IDLE; _fillState = FillState.IDLE;
switch(_flushState) switch (_flushState)
{ {
case WAIT_FOR_FILL: case WAIT_FOR_FILL:
_flushState = FlushState.IDLE; _flushState = FlushState.IDLE;
@ -464,7 +464,7 @@ public class SslConnection extends AbstractConnection
if (connection instanceof AbstractConnection) if (connection instanceof AbstractConnection)
{ {
AbstractConnection a = (AbstractConnection)connection; AbstractConnection a = (AbstractConnection)connection;
if (a.getInputBufferSize()<_sslEngine.getSession().getApplicationBufferSize()) if (a.getInputBufferSize() < _sslEngine.getSession().getApplicationBufferSize())
a.setInputBufferSize(_sslEngine.getSession().getApplicationBufferSize()); a.setInputBufferSize(_sslEngine.getSession().getApplicationBufferSize());
} }
super.setConnection(connection); super.setConnection(connection);
@ -480,7 +480,7 @@ public class SslConnection extends AbstractConnection
{ {
try try
{ {
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(">fill {}", SslConnection.this); LOG.debug(">fill {}", SslConnection.this);
@ -488,12 +488,12 @@ public class SslConnection extends AbstractConnection
int filled = -2; int filled = -2;
try try
{ {
if (_fillState!=FillState.IDLE) if (_fillState != FillState.IDLE)
return filled = 0; return filled = 0;
// Do we already have some decrypted data? // Do we already have some decrypted data?
if (BufferUtil.hasContent(_decryptedInput)) if (BufferUtil.hasContent(_decryptedInput))
return filled = BufferUtil.append(buffer,_decryptedInput); return filled = BufferUtil.append(buffer, _decryptedInput);
// loop filling and unwrapping until we have something // loop filling and unwrapping until we have something
while (true) while (true)
@ -501,7 +501,7 @@ public class SslConnection extends AbstractConnection
HandshakeStatus status = _sslEngine.getHandshakeStatus(); HandshakeStatus status = _sslEngine.getHandshakeStatus();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("fill {}", status); LOG.debug("fill {}", status);
switch(status) switch (status)
{ {
case NEED_UNWRAP: case NEED_UNWRAP:
case NOT_HANDSHAKING: case NOT_HANDSHAKING:
@ -512,8 +512,13 @@ public class SslConnection extends AbstractConnection
continue; continue;
case NEED_WRAP: case NEED_WRAP:
if (_flushState==FlushState.IDLE && flush(BufferUtil.EMPTY_BUFFER)) if (_flushState == FlushState.IDLE && flush(BufferUtil.EMPTY_BUFFER))
{
if (_sslEngine.isInboundDone())
// TODO this is probably a JVM bug, work around it by -1
return -1;
continue; continue;
}
// handle in needsFillInterest // handle in needsFillInterest
return filled = 0; return filled = 0;
@ -521,7 +526,7 @@ public class SslConnection extends AbstractConnection
throw new IllegalStateException("Unexpected HandshakeStatus " + status); throw new IllegalStateException("Unexpected HandshakeStatus " + status);
} }
if (_encryptedInput==null) if (_encryptedInput == null)
_encryptedInput = _bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize(), _encryptedDirectBuffers); _encryptedInput = _bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize(), _encryptedDirectBuffers);
// can we use the passed buffer if it is big enough // can we use the passed buffer if it is big enough
@ -561,11 +566,11 @@ public class SslConnection extends AbstractConnection
{ {
BufferUtil.flipToFlush(app_in, pos); BufferUtil.flipToFlush(app_in, pos);
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unwrap {} {} unwrapBuffer={} appBuffer={}", LOG.debug("unwrap net_filled={} {} encryptedBuffer={} unwrapBuffer={} appBuffer={}",
net_filled, net_filled,
unwrapResult.toString().replace('\n',' '), unwrapResult.toString().replace('\n', ' '),
BufferUtil.toSummaryString(_encryptedInput),
BufferUtil.toDetailString(app_in), BufferUtil.toDetailString(app_in),
BufferUtil.toDetailString(buffer)); BufferUtil.toDetailString(buffer));
@ -573,7 +578,7 @@ public class SslConnection extends AbstractConnection
// Extra check on unwrapResultStatus == OK with zero bytes consumed // Extra check on unwrapResultStatus == OK with zero bytes consumed
// or produced is due to an SSL client on Android (see bug #454773). // or produced is due to an SSL client on Android (see bug #454773).
if (unwrap==Status.OK && unwrapResult.bytesConsumed() == 0 && unwrapResult.bytesProduced() == 0) if (unwrap == Status.OK && unwrapResult.bytesConsumed() == 0 && unwrapResult.bytesProduced() == 0)
unwrap = Status.BUFFER_UNDERFLOW; unwrap = Status.BUFFER_UNDERFLOW;
switch (unwrap) switch (unwrap)
@ -605,9 +610,9 @@ public class SslConnection extends AbstractConnection
// another call to fill() or flush(). // another call to fill() or flush().
if (unwrapResult.bytesProduced() > 0) if (unwrapResult.bytesProduced() > 0)
{ {
if (app_in==buffer) if (app_in == buffer)
return filled = unwrapResult.bytesProduced(); return filled = unwrapResult.bytesProduced();
return filled = BufferUtil.append(buffer,_decryptedInput); return filled = BufferUtil.append(buffer, _decryptedInput);
} }
break; break;
@ -622,10 +627,10 @@ public class SslConnection extends AbstractConnection
{ {
handshakeFailed(x); handshakeFailed(x);
if (_flushState==FlushState.WAIT_FOR_FILL) if (_flushState == FlushState.WAIT_FOR_FILL)
{ {
_flushState=FlushState.IDLE; _flushState = FlushState.IDLE;
getExecutor().execute(()->_decryptedEndPoint.getWriteFlusher().onFail(x)); getExecutor().execute(() -> _decryptedEndPoint.getWriteFlusher().onFail(x));
} }
throw x; throw x;
@ -644,10 +649,10 @@ public class SslConnection extends AbstractConnection
_decryptedInput = null; _decryptedInput = null;
} }
if (_flushState==FlushState.WAIT_FOR_FILL) if (_flushState == FlushState.WAIT_FOR_FILL)
{ {
_flushState=FlushState.IDLE; _flushState = FlushState.IDLE;
getExecutor().execute(()->_decryptedEndPoint.getWriteFlusher().completeWrite()); getExecutor().execute(() -> _decryptedEndPoint.getWriteFlusher().completeWrite());
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -672,15 +677,15 @@ public class SslConnection extends AbstractConnection
boolean fillable; boolean fillable;
ByteBuffer write = null; ByteBuffer write = null;
boolean interest = false; boolean interest = false;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug(">needFillInterest uf={} {}", _underflown, SslConnection.this); LOG.debug(">needFillInterest uf={} {}", _underflown, SslConnection.this);
LOG.debug("ei={} di={}",BufferUtil.toDetailString(_encryptedInput),BufferUtil.toDetailString(_decryptedInput)); LOG.debug("ei={} di={}", BufferUtil.toDetailString(_encryptedInput), BufferUtil.toDetailString(_decryptedInput));
} }
if (_fillState!=FillState.IDLE) if (_fillState != FillState.IDLE)
return; return;
// Fillable if we have decrypted Input OR encrypted input that has not yet been underflown. // Fillable if we have decrypted Input OR encrypted input that has not yet been underflown.
@ -720,10 +725,10 @@ public class SslConnection extends AbstractConnection
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("<needFillInterest s={}/{} f={} i={} w={}",_flushState,_fillState,fillable,interest,BufferUtil.toDetailString(write)); LOG.debug("<needFillInterest s={}/{} f={} i={} w={}", _flushState, _fillState, fillable, interest, BufferUtil.toDetailString(write));
} }
if (write!=null) if (write != null)
getEndPoint().write(_incompleteWriteCallback, write); getEndPoint().write(_incompleteWriteCallback, write);
else if (fillable) else if (fillable)
getExecutor().execute(_runFillable); getExecutor().execute(_runFillable);
@ -744,14 +749,14 @@ public class SslConnection extends AbstractConnection
if (_handshake.compareAndSet(Handshake.INITIAL, Handshake.SUCCEEDED)) if (_handshake.compareAndSet(Handshake.INITIAL, Handshake.SUCCEEDED))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("handshake succeeded {} {} {}/{}",SslConnection.this, LOG.debug("handshake succeeded {} {} {}/{}", SslConnection.this,
_sslEngine.getUseClientMode() ? "client" : "resumed server", _sslEngine.getUseClientMode() ? "client" : "resumed server",
_sslEngine.getSession().getProtocol(),_sslEngine.getSession().getCipherSuite()); _sslEngine.getSession().getProtocol(), _sslEngine.getSession().getCipherSuite());
notifyHandshakeSucceeded(_sslEngine); notifyHandshakeSucceeded(_sslEngine);
} }
else if (_handshake.get() == Handshake.SUCCEEDED) else if (_handshake.get() == Handshake.SUCCEEDED)
{ {
if (_renegotiationLimit>0) if (_renegotiationLimit > 0)
_renegotiationLimit--; _renegotiationLimit--;
} }
} }
@ -805,12 +810,12 @@ public class SslConnection extends AbstractConnection
{ {
try try
{ {
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug(">flush {}", SslConnection.this); LOG.debug(">flush {}", SslConnection.this);
int i=0; int i = 0;
for (ByteBuffer b : appOuts) for (ByteBuffer b : appOuts)
LOG.debug("flush b[{}]={}", i++, BufferUtil.toDetailString(b)); LOG.debug("flush b[{}]={}", i++, BufferUtil.toDetailString(b));
} }
@ -818,7 +823,7 @@ public class SslConnection extends AbstractConnection
Boolean result = null; Boolean result = null;
try try
{ {
if (_flushState!=FlushState.IDLE) if (_flushState != FlushState.IDLE)
return result = false; return result = false;
// Keep going while we can make progress or until we are done // Keep going while we can make progress or until we are done
@ -826,8 +831,8 @@ public class SslConnection extends AbstractConnection
{ {
HandshakeStatus status = _sslEngine.getHandshakeStatus(); HandshakeStatus status = _sslEngine.getHandshakeStatus();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("flush {}",status); LOG.debug("flush {}", status);
switch(status) switch (status)
{ {
case NEED_WRAP: case NEED_WRAP:
case NOT_HANDSHAKING: case NOT_HANDSHAKING:
@ -838,10 +843,10 @@ public class SslConnection extends AbstractConnection
continue; continue;
case NEED_UNWRAP: case NEED_UNWRAP:
if (_fillState==FillState.IDLE) if (_fillState == FillState.IDLE)
{ {
int filled = fill(BufferUtil.EMPTY_BUFFER); int filled = fill(BufferUtil.EMPTY_BUFFER);
if (_sslEngine.getHandshakeStatus()!=status) if (_sslEngine.getHandshakeStatus() != status)
continue; continue;
if (filled < 0) if (filled < 0)
throw new IOException("Broken pipe"); throw new IOException("Broken pipe");
@ -862,19 +867,23 @@ public class SslConnection extends AbstractConnection
try try
{ {
wrapResult = _sslEngine.wrap(appOuts, _encryptedOutput); wrapResult = _sslEngine.wrap(appOuts, _encryptedOutput);
if (LOG.isDebugEnabled())
LOG.debug("wrap {} {}", wrapResult.toString().replace('\n',' '), BufferUtil.toHexSummary(_encryptedOutput));
} }
finally finally
{ {
BufferUtil.flipToFlush(_encryptedOutput, pos); BufferUtil.flipToFlush(_encryptedOutput, pos);
} }
if (LOG.isDebugEnabled())
LOG.debug("wrap {} {} ioDone={}/{}",
wrapResult.toString().replace('\n', ' '),
BufferUtil.toSummaryString(_encryptedOutput),
_sslEngine.isInboundDone(),
_sslEngine.isOutboundDone());
// Was all the data consumed? // Was all the data consumed?
boolean allConsumed=true; boolean allConsumed = true;
for (ByteBuffer b : appOuts) for (ByteBuffer b : appOuts)
if (BufferUtil.hasContent(b)) if (BufferUtil.hasContent(b))
allConsumed=false; allConsumed = false;
// if we have net bytes, let's try to flush them // if we have net bytes, let's try to flush them
boolean flushed = true; boolean flushed = true;
@ -960,24 +969,24 @@ public class SslConnection extends AbstractConnection
{ {
boolean fillInterest = false; boolean fillInterest = false;
ByteBuffer write = null; ByteBuffer write = null;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(">onIncompleteFlush {} {}", SslConnection.this, BufferUtil.toDetailString(_encryptedOutput)); LOG.debug(">onIncompleteFlush {} {}", SslConnection.this, BufferUtil.toDetailString(_encryptedOutput));
if (_flushState!=FlushState.IDLE) if (_flushState != FlushState.IDLE)
return; return;
while(true) while (true)
{ {
HandshakeStatus status = _sslEngine.getHandshakeStatus(); HandshakeStatus status = _sslEngine.getHandshakeStatus();
switch(status) switch (status)
{ {
case NEED_TASK: case NEED_TASK:
case NEED_WRAP: case NEED_WRAP:
case NOT_HANDSHAKING: case NOT_HANDSHAKING:
// write what we have or an empty buffer to reschedule a call to flush // write what we have or an empty buffer to reschedule a call to flush
write = BufferUtil.hasContent(_encryptedOutput)?_encryptedOutput:BufferUtil.EMPTY_BUFFER; write = BufferUtil.hasContent(_encryptedOutput) ? _encryptedOutput : BufferUtil.EMPTY_BUFFER;
_flushState = FlushState.WRITING; _flushState = FlushState.WRITING;
break; break;
@ -990,7 +999,7 @@ public class SslConnection extends AbstractConnection
break; break;
} }
if (_fillState!=FillState.IDLE) if (_fillState != FillState.IDLE)
{ {
// Wait for a fill that is happening anyway // Wait for a fill that is happening anyway
_flushState = FlushState.WAIT_FOR_FILL; _flushState = FlushState.WAIT_FOR_FILL;
@ -1002,12 +1011,12 @@ public class SslConnection extends AbstractConnection
{ {
int filled = fill(BufferUtil.EMPTY_BUFFER); int filled = fill(BufferUtil.EMPTY_BUFFER);
// If this changed the status, let's try again // If this changed the status, let's try again
if (_sslEngine.getHandshakeStatus()!=status) if (_sslEngine.getHandshakeStatus() != status)
continue; continue;
if (filled < 0) if (filled < 0)
throw new IOException("Broken pipe"); throw new IOException("Broken pipe");
} }
catch(IOException e) catch (IOException e)
{ {
LOG.debug(e); LOG.debug(e);
close(e); close(e);
@ -1032,7 +1041,7 @@ public class SslConnection extends AbstractConnection
LOG.debug("<onIncompleteFlush s={}/{} fi={} w={}", _flushState, _fillState, fillInterest, BufferUtil.toDetailString(write)); LOG.debug("<onIncompleteFlush s={}/{} fi={} w={}", _flushState, _fillState, fillInterest, BufferUtil.toDetailString(write));
} }
if (write!=null) if (write != null)
getEndPoint().write(_incompleteWriteCallback, write); getEndPoint().write(_incompleteWriteCallback, write);
else if (fillInterest) else if (fillInterest)
ensureFillInterested(); ensureFillInterested();
@ -1053,7 +1062,7 @@ public class SslConnection extends AbstractConnection
{ {
boolean close; boolean close;
boolean flush = false; boolean flush = false;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
boolean ishut = getEndPoint().isInputShutdown(); boolean ishut = getEndPoint().isInputShutdown();
boolean oshut = getEndPoint().isOutputShutdown(); boolean oshut = getEndPoint().isOutputShutdown();
@ -1101,7 +1110,7 @@ public class SslConnection extends AbstractConnection
private void ensureFillInterested() private void ensureFillInterested()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ensureFillInterested {}",SslConnection.this); LOG.debug("ensureFillInterested {}", SslConnection.this);
SslConnection.this.tryFillInterested(_sslReadCallback); SslConnection.this.tryFillInterested(_sslReadCallback);
} }
@ -1142,7 +1151,7 @@ public class SslConnection extends AbstractConnection
@Override @Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
return getEndPoint().isInputShutdown() || isInboundDone(); return BufferUtil.isEmpty(_decryptedInput) && (getEndPoint().isInputShutdown() || isInboundDone());
} }
private boolean isInboundDone() private boolean isInboundDone()
@ -1215,7 +1224,7 @@ public class SslConnection extends AbstractConnection
return false; return false;
} }
if (getRenegotiationLimit()==0) if (getRenegotiationLimit() == 0)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Renegotiation limit exceeded {}", SslConnection.this); LOG.debug("Renegotiation limit exceeded {}", SslConnection.this);
@ -1244,14 +1253,14 @@ public class SslConnection extends AbstractConnection
public void succeeded() public void succeeded()
{ {
boolean fillable; boolean fillable;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("IncompleteWriteCB succeeded {}", SslConnection.this); LOG.debug("IncompleteWriteCB succeeded {}", SslConnection.this);
releaseEncryptedOutputBuffer(); releaseEncryptedOutputBuffer();
_flushState = FlushState.IDLE; _flushState = FlushState.IDLE;
fillable = _fillState==FillState.WAIT_FOR_FLUSH; fillable = _fillState == FillState.WAIT_FOR_FLUSH;
if (fillable) if (fillable)
_fillState = FillState.IDLE; _fillState = FillState.IDLE;
} }
@ -1266,7 +1275,7 @@ public class SslConnection extends AbstractConnection
public void failed(final Throwable x) public void failed(final Throwable x)
{ {
boolean fail_fill_interest; boolean fail_fill_interest;
synchronized(_decryptedEndPoint) synchronized (_decryptedEndPoint)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("IncompleteWriteCB failed {}", SslConnection.this, x); LOG.debug("IncompleteWriteCB failed {}", SslConnection.this, x);
@ -1275,12 +1284,12 @@ public class SslConnection extends AbstractConnection
releaseEncryptedOutputBuffer(); releaseEncryptedOutputBuffer();
_flushState = FlushState.IDLE; _flushState = FlushState.IDLE;
fail_fill_interest = _fillState==FillState.WAIT_FOR_FLUSH; fail_fill_interest = _fillState == FillState.WAIT_FOR_FLUSH;
if (fail_fill_interest) if (fail_fill_interest)
_fillState = FillState.IDLE; _fillState = FillState.IDLE;
} }
getExecutor().execute(()-> getExecutor().execute(() ->
{ {
if (fail_fill_interest) if (fail_fill_interest)
_decryptedEndPoint.getFillInterest().onFail(x); _decryptedEndPoint.getFillInterest().onFail(x);
@ -1297,7 +1306,7 @@ public class SslConnection extends AbstractConnection
@Override @Override
public String toString() public String toString()
{ {
return String.format("SSL@%h.DEP.writeCallback",SslConnection.this); return String.format("SSL@%h.DEP.writeCallback", SslConnection.this);
} }
} }
} }

View File

@ -263,14 +263,26 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (suspended || getEndPoint().getConnection() != this) if (suspended || getEndPoint().getConnection() != this)
break; break;
} }
else else if (filled==0)
{ {
if (filled <= 0)
{
if (filled == 0)
fillInterested(); fillInterested();
break; break;
} }
else if (filled<0)
{
switch(_channel.getState().getState())
{
case COMPLETING:
case COMPLETED:
case IDLE:
case THROWN:
case ASYNC_ERROR:
getEndPoint().shutdownOutput();
break;
default:
break;
}
break;
} }
} }
} }
@ -310,16 +322,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (BufferUtil.isEmpty(_requestBuffer)) if (BufferUtil.isEmpty(_requestBuffer))
{ {
// Can we fill?
if(getEndPoint().isInputShutdown())
{
// No pretend we read -1
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled -1 {}",this,BufferUtil.toDetailString(_requestBuffer));
return -1;
}
// Get a buffer // Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request, // We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed. // so there are not an possible legal threads calling #parseContent or #completed.
@ -411,13 +413,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (_input.isAsync()) if (_input.isAsync())
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unconsumed async input {}", this); LOG.debug("{}unconsumed input {}",_parser.isChunking()?"Possible ":"", this);
_channel.abort(new IOException("unconsumed input")); _channel.abort(new IOException("unconsumed input"));
} }
else else
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {}", this); LOG.debug("{}unconsumed input {}",_parser.isChunking()?"Possible ":"", this);
// Complete reading the request // Complete reading the request
if (!_input.consumeAll()) if (!_input.consumeAll())
_channel.abort(new IOException("unconsumed input")); _channel.abort(new IOException("unconsumed input"));

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.EOFException; import java.io.EOFException;
@ -58,9 +48,20 @@ import org.eclipse.jetty.util.log.AbstractLogger;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class HttpServerTestBase extends HttpServerTestFixture public abstract class HttpServerTestBase extends HttpServerTestFixture
{ {
@ -1692,4 +1693,89 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
client.close(); client.close();
} }
} }
@Test
@DisabledOnJre({JRE.JAVA_8, JRE.JAVA_9, JRE.JAVA_10})
public void testShutdown() throws Exception
{
configureServer(new ReadExactHandler());
byte[] content = new byte[4096];
Arrays.fill(content,(byte)'X');
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
// Send two persistent pipelined requests and then shutdown output
os.write(("GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Content-Length: "+content.length+"\r\n" +
"\r\n").getBytes(StandardCharsets.ISO_8859_1));
os.write(content);
os.write(("GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Content-Length: "+content.length+"\r\n" +
"\r\n").getBytes(StandardCharsets.ISO_8859_1));
os.write(content);
os.flush();
// Thread.sleep(50);
client.shutdownOutput();
// Read the two pipelined responses
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("Read "+content.length));
response = HttpTester.parseResponse(client.getInputStream());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("Read "+content.length));
// Read the close
assertThat(client.getInputStream().read(),is(-1));
}
}
@Test
@DisabledOnJre({JRE.JAVA_8, JRE.JAVA_9, JRE.JAVA_10})
public void testChunkedShutdown() throws Exception
{
configureServer(new ReadExactHandler(4096));
byte[] content = new byte[4096];
Arrays.fill(content,(byte)'X');
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
// Send two persistent pipelined requests and then shutdown output
os.write(("GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
"1000\r\n").getBytes(StandardCharsets.ISO_8859_1));
os.write(content);
os.write("\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.write(("GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
"1000\r\n").getBytes(StandardCharsets.ISO_8859_1));
os.write(content);
os.write("\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
client.shutdownOutput();
// Read the two pipelined responses
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("Read "+content.length));
response = HttpTester.parseResponse(client.getInputStream());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("Read "+content.length));
// Read the close
assertThat(client.getInputStream().read(),is(-1));
}
}
} }

View File

@ -27,6 +27,7 @@ import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -186,6 +187,54 @@ public class HttpServerTestFixture
} }
} }
protected static class ReadExactHandler extends AbstractHandler.ErrorDispatchHandler
{
private int expected;
public ReadExactHandler()
{
this(-1);
}
public ReadExactHandler(int expected)
{
this.expected = expected;
}
@Override
public void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
int len = expected<0?request.getContentLength():expected;
if (len<0)
throw new IllegalStateException();
byte[] content = new byte[len];
int offset = 0;
while (offset<len)
{
int read = request.getInputStream().read(content,offset,len-offset);
if (read<0)
break;
offset+=read;
}
response.setStatus(200);
String reply = "Read " + offset + "\r\n";
response.setContentLength(reply.length());
response.getOutputStream().write(reply.getBytes(StandardCharsets.ISO_8859_1));
}
@Override
protected void doError(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
System.err.println("ERROR: "+request.getAttribute(RequestDispatcher.ERROR_MESSAGE));
Throwable th = (Throwable)request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
if (th!=null)
th.printStackTrace();
super.doError(target, baseRequest, request, response);
}
}
protected static class ReadHandler extends AbstractHandler protected static class ReadHandler extends AbstractHandler
{ {
@Override @Override

View File

@ -0,0 +1,162 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.ssl;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import javax.net.ssl.SSLContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
// Only in JDK 11 is possible to use SSLSocket.shutdownOutput().
@DisabledOnJre({JRE.JAVA_8, JRE.JAVA_9, JRE.JAVA_10})
public class SSLReadEOFAfterResponseTest
{
@Test
public void testReadEOFAfterResponse() throws Exception
{
File keystore = MavenTestingUtils.getTestResourceFile("keystore");
SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStoreResource(Resource.newResource(keystore));
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setKeyManagerPassword("keypwd");
Server server = new Server();
ServerConnector connector = new ServerConnector(server, sslContextFactory);
int idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
String content = "the quick brown fox jumped over the lazy dog";
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
server.setHandler(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
// First: read the whole content.
InputStream input = request.getInputStream();
int length = bytes.length;
while (length > 0)
{
int read = input.read();
if (read < 0)
throw new IllegalStateException();
--length;
}
// Second: write the response.
response.setContentLength(bytes.length);
response.getOutputStream().write(bytes);
response.flushBuffer();
sleep(idleTimeout / 2);
// Third, read the EOF.
int read = input.read();
if (read >= 0)
throw new IllegalStateException();
}
});
server.start();
try
{
SSLContext sslContext = sslContextFactory.getSslContext();
try (Socket client = sslContext.getSocketFactory().createSocket("localhost", connector.getLocalPort()))
{
client.setSoTimeout(5 * idleTimeout);
OutputStream output = client.getOutputStream();
String request = "" +
"POST / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Content-Length: " + content.length() + "\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.write(bytes);
output.flush();
// Read the response.
InputStream input = client.getInputStream();
int crlfs = 0;
while (true)
{
int read = input.read();
assertThat(read, Matchers.greaterThanOrEqualTo(0));
if (read == '\r' || read == '\n')
++crlfs;
else
crlfs = 0;
if (crlfs == 4)
break;
}
for (byte b : bytes)
assertEquals(b, input.read());
// Shutdown the output so the server reads the TLS close_notify.
client.shutdownOutput();
// client.close();
// The connection should now be idle timed out by the server.
int read = input.read();
assertEquals(-1, read);
}
}
finally
{
server.stop();
}
}
private void sleep(long time) throws IOException
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}

View File

@ -1,5 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG
#org.eclipse.jetty.server.ConnectionLimit.LEVEL=DEBUG #org.eclipse.jetty.server.ConnectionLimit.LEVEL=DEBUG
#org.eclipse.jetty.server.AcceptRateLimit.LEVEL=DEBUG #org.eclipse.jetty.server.AcceptRateLimit.LEVEL=DEBUG