From 8c4ee8496feb11fc0a0992834c60c3c6fdd1efe9 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 20 Nov 2018 13:53:42 +0100 Subject: [PATCH] Jetty 9.4.x 3038 ssl connection leak (#3121) Issue #3038 - SSL connection leak. Fixed SSL spin caused when fill had NEED_WRAP, but a flush/wrap produced 0 bytes and stayed in NEED_WRAP Removed check of isInputShutdown prior to filling that allowed EOF to overtake data already read. Fix for leak by shutting down output in HttpConnection if filled -1 and the HttpChannelState was no longer processing current request. Signed-off-by: Simone Bordet Signed-off-by: Greg Wilkins --- .../client/http/HttpReceiverOverHTTP.java | 17 ++ .../org/eclipse/jetty/http/HttpParser.java | 21 +- .../org/eclipse/jetty/http/HttpTester.java | 26 +- .../eclipse/jetty/io/ssl/SslConnection.java | 255 +++++++++--------- .../eclipse/jetty/server/HttpConnection.java | 36 +-- .../jetty/server/HttpServerTestBase.java | 108 +++++++- .../jetty/server/HttpServerTestFixture.java | 49 ++++ .../ssl/SSLReadEOFAfterResponseTest.java | 162 +++++++++++ .../test/resources/jetty-logging.properties | 1 - 9 files changed, 507 insertions(+), 168 deletions(-) create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SSLReadEOFAfterResponseTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index d6a9c2f96a5..3be68769bb5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.client.http; import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpExchange; @@ -39,6 +40,7 @@ import org.eclipse.jetty.util.CompletableCallback; public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler { + private final AtomicReference handlingContent = new AtomicReference<>(ContentState.IDLE); private final HttpParser parser; private ByteBuffer buffer; private boolean shutdown; @@ -263,8 +265,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (exchange == null) return false; + handlingContent.set(ContentState.CONTENT); CompletableCallback callback = new CompletableCallback() { + @Override + public void succeeded() + { + boolean messageComplete = !handlingContent.compareAndSet(ContentState.CONTENT, ContentState.IDLE); + super.succeeded(); + if (messageComplete) + messageComplete(); + } + @Override public void resume() { @@ -304,6 +316,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res @Override public boolean messageComplete() { + if (handlingContent.compareAndSet(ContentState.CONTENT, ContentState.COMPLETE)) + return false; + HttpExchange exchange = getHttpExchange(); if (exchange == null) return false; @@ -375,4 +390,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { return String.format("%s[%s]", super.toString(), parser); } + + private enum ContentState { IDLE, CONTENT, COMPLETE } } diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java index 579b5ac90e5..3c0405e9cf7 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java @@ -767,7 +767,7 @@ public class HttpParser case LF: setState(State.HEADER); - handle=_responseHandler.startResponse(_version, _responseStatus, null)||handle; + handle |= _responseHandler.startResponse(_version, _responseStatus, null); break; default: @@ -789,7 +789,7 @@ public class HttpParser handle=_requestHandler.startRequest(_methodString,_uri.toString(), HttpVersion.HTTP_0_9); setState(State.END); BufferUtil.clear(buffer); - handle= handleHeaderContentMessage() || handle; + handle |= handleHeaderContentMessage(); break; case ALPHA: @@ -865,7 +865,7 @@ public class HttpParser if (_responseHandler!=null) { setState(State.HEADER); - handle=_responseHandler.startResponse(_version, _responseStatus, null)||handle; + handle |= _responseHandler.startResponse(_version, _responseStatus, null); } else { @@ -876,7 +876,7 @@ public class HttpParser handle=_requestHandler.startRequest(_methodString,_uri.toString(), HttpVersion.HTTP_0_9); setState(State.END); BufferUtil.clear(buffer); - handle= handleHeaderContentMessage() || handle; + handle |= handleHeaderContentMessage(); } break; @@ -905,7 +905,7 @@ public class HttpParser setState(State.HEADER); - handle=_requestHandler.startRequest(_methodString,_uri.toString(), _version)||handle; + handle |= _requestHandler.startRequest(_methodString,_uri.toString(), _version); continue; case ALPHA: @@ -927,7 +927,7 @@ public class HttpParser case LF: String reason=takeString(); setState(State.HEADER); - handle=_responseHandler.startResponse(_version, _responseStatus, reason)||handle; + handle |= _responseHandler.startResponse(_version, _responseStatus, reason); continue; case ALPHA: @@ -1660,14 +1660,16 @@ public class HttpParser _contentPosition += _contentChunk.remaining(); buffer.position(buffer.position()+_contentChunk.remaining()); - if (_handler.content(_contentChunk)) - return true; + boolean handle = _handler.content(_contentChunk); if(_contentPosition == _contentLength) { setState(State.END); - return handleContentMessage(); + boolean handleContent = handleContentMessage(); + return handle || handleContent; } + else if (handle) + return true; } break; } @@ -1808,7 +1810,6 @@ public class HttpParser /* ------------------------------------------------------------------------------- */ public boolean isAtEOF() - { return _eof; } diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/HttpTester.java b/jetty-http/src/test/java/org/eclipse/jetty/http/HttpTester.java index 60d5a50c06e..6b2a2f01acf 100644 --- a/jetty-http/src/test/java/org/eclipse/jetty/http/HttpTester.java +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/HttpTester.java @@ -27,7 +27,6 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -110,13 +109,28 @@ public class HttpTester public static Response parseResponse(InputStream responseStream) throws IOException { - ByteArrayOutputStream contentStream = new ByteArrayOutputStream(); - IO.copy(responseStream, contentStream); - Response r=new Response(); HttpParser parser =new HttpParser(r); - parser.parseNext(ByteBuffer.wrap(contentStream.toByteArray())); - return r; + + // Read and parse a character at a time so we never can read more than we should. + byte[] array = new byte[1]; + ByteBuffer buffer = ByteBuffer.wrap(array); + buffer.limit(1); + + while(true) + { + buffer.position(1); + int l = responseStream.read(array); + if (l<0) + parser.atEOF(); + else + buffer.position(0); + + if (parser.parseNext(buffer)) + return r; + else if (l<0) + return null; + } } public abstract static class Input diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 3b1d00c573b..47f81f633e4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -80,28 +80,28 @@ public class SslConnection extends AbstractConnection { private static final Logger LOG = Log.getLogger(SslConnection.class); private static final String TLS_1_3 = "TLSv1.3"; - + private enum Handshake { INITIAL, SUCCEEDED, FAILED } - - private enum FillState + + private enum FillState { IDLE, // Not Filling any data INTERESTED, // We have a pending read interest WAIT_FOR_FLUSH // Waiting for a flush to happen } - - private enum FlushState - { + + private enum FlushState + { IDLE, // Not flushing any data WRITING, // We have a pending write of encrypted data WAIT_FOR_FILL // Waiting for a fill to happen } - + private final List handshakeListeners = new ArrayList<>(); private final ByteBufferPool _bufferPool; private final SSLEngine _sslEngine; @@ -119,20 +119,20 @@ public class SslConnection extends AbstractConnection private FillState _fillState = FillState.IDLE; private AtomicReference _handshake = new AtomicReference<>(Handshake.INITIAL); private boolean _underflown; - - private abstract class RunnableTask implements Runnable, Invocable + + private abstract class RunnableTask implements Runnable, Invocable { private final String _operation; protected RunnableTask(String op) { - _operation=op; + _operation = op; } @Override public String toString() { - return String.format("SSL:%s:%s:%s",SslConnection.this,_operation,getInvocationType()); + return String.format("SSL:%s:%s:%s", SslConnection.this, _operation, getInvocationType()); } } @@ -174,7 +174,7 @@ public class SslConnection extends AbstractConnection @Override public String toString() { - return String.format("SSLC.NBReadCB@%x{%s}", SslConnection.this.hashCode(),SslConnection.this); + return String.format("SSLC.NBReadCB@%x{%s}", SslConnection.this.hashCode(), SslConnection.this); } }; @@ -233,7 +233,7 @@ public class SslConnection extends AbstractConnection /** * @return The number of renegotions allowed for this connection. When the limit - * is 0 renegotiation will be denied. If the limit is less than 0 then no limit is applied. + * is 0 renegotiation will be denied. If the limit is less than 0 then no limit is applied. */ public int getRenegotiationLimit() { @@ -241,9 +241,9 @@ public class SslConnection extends AbstractConnection } /** - * @param renegotiationLimit The number of renegotions allowed for this connection. - * When the limit is 0 renegotiation will be denied. If the limit is less than 0 then no limit is applied. - * Default -1. + * @param renegotiationLimit The number of renegotions allowed for this connection. + * When the limit is 0 renegotiation will be denied. If the limit is less than 0 then no limit is applied. + * Default -1. */ public void setRenegotiationLimit(int renegotiationLimit) { @@ -311,26 +311,26 @@ public class SslConnection extends AbstractConnection @Override public void onFillInterestedFailed(Throwable cause) { - _decryptedEndPoint.onFillableFail(cause==null?new IOException():cause); + _decryptedEndPoint.onFillableFail(cause == null ? new IOException() : cause); } @Override public String toConnectionString() { ByteBuffer b = _encryptedInput; - int ei=b==null?-1:b.remaining(); + int ei = b == null ? -1 : b.remaining(); b = _encryptedOutput; - int eo=b==null?-1:b.remaining(); + int eo = b == null ? -1 : b.remaining(); b = _decryptedInput; - int di=b==null?-1:b.remaining(); + int di = b == null ? -1 : b.remaining(); Connection connection = _decryptedEndPoint.getConnection(); return String.format("%s@%x{%s,eio=%d/%d,di=%d,fill=%s,flush=%s}~>%s=>%s", getClass().getSimpleName(), hashCode(), _sslEngine.getHandshakeStatus(), - ei,eo,di, - _fillState,_flushState, + ei, eo, di, + _fillState, _flushState, _decryptedEndPoint.toEndPointString(), connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection); } @@ -349,7 +349,7 @@ public class SslConnection extends AbstractConnection public class DecryptedEndPoint extends AbstractEndPoint { private final Callback _incompleteWriteCallback = new IncompleteWriteCallback(); - + public DecryptedEndPoint() { // Disable idle timeout checking: no scheduler and -1 timeout for this instance. @@ -399,22 +399,22 @@ public class SslConnection extends AbstractConnection { // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read boolean waiting_for_fill; - synchronized(_decryptedEndPoint) + synchronized (_decryptedEndPoint) { if (LOG.isDebugEnabled()) LOG.debug("onFillable {}", SslConnection.this); _fillState = FillState.IDLE; - waiting_for_fill = _flushState==FlushState.WAIT_FOR_FILL; + waiting_for_fill = _flushState == FlushState.WAIT_FOR_FILL; } - + getFillInterest().fillable(); - + if (waiting_for_fill) { - synchronized(_decryptedEndPoint) + synchronized (_decryptedEndPoint) { - waiting_for_fill = _flushState==FlushState.WAIT_FOR_FILL; + waiting_for_fill = _flushState == FlushState.WAIT_FOR_FILL; } if (waiting_for_fill) fill(BufferUtil.EMPTY_BUFFER); @@ -430,13 +430,13 @@ public class SslConnection extends AbstractConnection { // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read boolean fail = false; - synchronized(_decryptedEndPoint) + synchronized (_decryptedEndPoint) { if (LOG.isDebugEnabled()) LOG.debug("onFillableFail {}", SslConnection.this, failure); - + _fillState = FillState.IDLE; - switch(_flushState) + switch (_flushState) { case WAIT_FOR_FILL: _flushState = FlushState.IDLE; @@ -444,12 +444,12 @@ public class SslConnection extends AbstractConnection break; default: break; - } + } } // wake up whoever is doing the fill getFillInterest().onFail(failure); - + // Try to complete the write if (fail) { @@ -464,7 +464,7 @@ public class SslConnection extends AbstractConnection if (connection instanceof AbstractConnection) { AbstractConnection a = (AbstractConnection)connection; - if (a.getInputBufferSize()<_sslEngine.getSession().getApplicationBufferSize()) + if (a.getInputBufferSize() < _sslEngine.getSession().getApplicationBufferSize()) a.setInputBufferSize(_sslEngine.getSession().getApplicationBufferSize()); } super.setConnection(connection); @@ -480,7 +480,7 @@ public class SslConnection extends AbstractConnection { try { - synchronized(_decryptedEndPoint) + synchronized (_decryptedEndPoint) { if (LOG.isDebugEnabled()) LOG.debug(">fill {}", SslConnection.this); @@ -488,20 +488,20 @@ public class SslConnection extends AbstractConnection int filled = -2; try { - if (_fillState!=FillState.IDLE) + if (_fillState != FillState.IDLE) return filled = 0; - + // Do we already have some decrypted data? if (BufferUtil.hasContent(_decryptedInput)) - return filled = BufferUtil.append(buffer,_decryptedInput); - + return filled = BufferUtil.append(buffer, _decryptedInput); + // loop filling and unwrapping until we have something while (true) { HandshakeStatus status = _sslEngine.getHandshakeStatus(); if (LOG.isDebugEnabled()) LOG.debug("fill {}", status); - switch(status) + switch (status) { case NEED_UNWRAP: case NOT_HANDSHAKING: @@ -510,20 +510,25 @@ public class SslConnection extends AbstractConnection case NEED_TASK: _sslEngine.getDelegatedTask().run(); continue; - + case NEED_WRAP: - if (_flushState==FlushState.IDLE && flush(BufferUtil.EMPTY_BUFFER)) + if (_flushState == FlushState.IDLE && flush(BufferUtil.EMPTY_BUFFER)) + { + if (_sslEngine.isInboundDone()) + // TODO this is probably a JVM bug, work around it by -1 + return -1; continue; + } // handle in needsFillInterest return filled = 0; - + default: throw new IllegalStateException("Unexpected HandshakeStatus " + status); } - - if (_encryptedInput==null) + + if (_encryptedInput == null) _encryptedInput = _bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize(), _encryptedDirectBuffers); - + // can we use the passed buffer if it is big enough ByteBuffer app_in; if (_decryptedInput == null) @@ -538,7 +543,7 @@ public class SslConnection extends AbstractConnection app_in = _decryptedInput; BufferUtil.compact(_encryptedInput); } - + // Let's try reading some encrypted data... even if we have some already. int net_filled = getEndPoint().fill(_encryptedInput); @@ -561,26 +566,26 @@ public class SslConnection extends AbstractConnection { BufferUtil.flipToFlush(app_in, pos); } - if (LOG.isDebugEnabled()) - LOG.debug("unwrap {} {} unwrapBuffer={} appBuffer={}", - net_filled, - unwrapResult.toString().replace('\n',' '), - BufferUtil.toDetailString(app_in), - BufferUtil.toDetailString(buffer)); + LOG.debug("unwrap net_filled={} {} encryptedBuffer={} unwrapBuffer={} appBuffer={}", + net_filled, + unwrapResult.toString().replace('\n', ' '), + BufferUtil.toSummaryString(_encryptedInput), + BufferUtil.toDetailString(app_in), + BufferUtil.toDetailString(buffer)); SSLEngineResult.Status unwrap = unwrapResult.getStatus(); // Extra check on unwrapResultStatus == OK with zero bytes consumed // or produced is due to an SSL client on Android (see bug #454773). - if (unwrap==Status.OK && unwrapResult.bytesConsumed() == 0 && unwrapResult.bytesProduced() == 0) + if (unwrap == Status.OK && unwrapResult.bytesConsumed() == 0 && unwrapResult.bytesProduced() == 0) unwrap = Status.BUFFER_UNDERFLOW; - + switch (unwrap) { case CLOSED: return filled = -1; - + case BUFFER_UNDERFLOW: if (net_filled > 0) continue; // try filling some more @@ -596,7 +601,7 @@ public class SslConnection extends AbstractConnection { if (unwrapResult.getHandshakeStatus() == HandshakeStatus.FINISHED) handshakeSucceeded(); - + if (isRenegotiating() && !allowRenegotiate()) return filled = -1; @@ -605,14 +610,14 @@ public class SslConnection extends AbstractConnection // another call to fill() or flush(). if (unwrapResult.bytesProduced() > 0) { - if (app_in==buffer) + if (app_in == buffer) return filled = unwrapResult.bytesProduced(); - return filled = BufferUtil.append(buffer,_decryptedInput); + return filled = BufferUtil.append(buffer, _decryptedInput); } - + break; } - + default: throw new IllegalStateException("Unexpected unwrap result " + unwrap); } @@ -622,10 +627,10 @@ public class SslConnection extends AbstractConnection { handshakeFailed(x); - if (_flushState==FlushState.WAIT_FOR_FILL) + if (_flushState == FlushState.WAIT_FOR_FILL) { - _flushState=FlushState.IDLE; - getExecutor().execute(()->_decryptedEndPoint.getWriteFlusher().onFail(x)); + _flushState = FlushState.IDLE; + getExecutor().execute(() -> _decryptedEndPoint.getWriteFlusher().onFail(x)); } throw x; @@ -637,19 +642,19 @@ public class SslConnection extends AbstractConnection _bufferPool.release(_encryptedInput); _encryptedInput = null; } - + if (_decryptedInput != null && !_decryptedInput.hasRemaining()) { _bufferPool.release(_decryptedInput); _decryptedInput = null; } - if (_flushState==FlushState.WAIT_FOR_FILL) + if (_flushState == FlushState.WAIT_FOR_FILL) { - _flushState=FlushState.IDLE; - getExecutor().execute(()->_decryptedEndPoint.getWriteFlusher().completeWrite()); + _flushState = FlushState.IDLE; + getExecutor().execute(() -> _decryptedEndPoint.getWriteFlusher().completeWrite()); } - + if (LOG.isDebugEnabled()) LOG.debug("needFillInterest uf={} {}", _underflown, SslConnection.this); - LOG.debug("ei={} di={}",BufferUtil.toDetailString(_encryptedInput),BufferUtil.toDetailString(_decryptedInput)); + LOG.debug("ei={} di={}", BufferUtil.toDetailString(_encryptedInput), BufferUtil.toDetailString(_decryptedInput)); } - if (_fillState!=FillState.IDLE) + if (_fillState != FillState.IDLE) return; // Fillable if we have decrypted Input OR encrypted input that has not yet been underflown. @@ -720,10 +725,10 @@ public class SslConnection extends AbstractConnection } if (LOG.isDebugEnabled()) - LOG.debug("0) + if (_renegotiationLimit > 0) _renegotiationLimit--; } } @@ -799,18 +804,18 @@ public class SslConnection extends AbstractConnection LOG.ignore(x); } } - + @Override public boolean flush(ByteBuffer... appOuts) throws IOException { try { - synchronized(_decryptedEndPoint) + synchronized (_decryptedEndPoint) { if (LOG.isDebugEnabled()) { LOG.debug(">flush {}", SslConnection.this); - int i=0; + int i = 0; for (ByteBuffer b : appOuts) LOG.debug("flush b[{}]={}", i++, BufferUtil.toDetailString(b)); } @@ -818,7 +823,7 @@ public class SslConnection extends AbstractConnection Boolean result = null; try { - if (_flushState!=FlushState.IDLE) + if (_flushState != FlushState.IDLE) return result = false; // Keep going while we can make progress or until we are done @@ -826,8 +831,8 @@ public class SslConnection extends AbstractConnection { HandshakeStatus status = _sslEngine.getHandshakeStatus(); if (LOG.isDebugEnabled()) - LOG.debug("flush {}",status); - switch(status) + LOG.debug("flush {}", status); + switch (status) { case NEED_WRAP: case NOT_HANDSHAKING: @@ -836,18 +841,18 @@ public class SslConnection extends AbstractConnection case NEED_TASK: _sslEngine.getDelegatedTask().run(); continue; - + case NEED_UNWRAP: - if (_fillState==FillState.IDLE) + if (_fillState == FillState.IDLE) { int filled = fill(BufferUtil.EMPTY_BUFFER); - if (_sslEngine.getHandshakeStatus()!=status) + if (_sslEngine.getHandshakeStatus() != status) continue; if (filled < 0) throw new IOException("Broken pipe"); } return result = false; - + default: throw new IllegalStateException("Unexpected HandshakeStatus " + status); } @@ -862,19 +867,23 @@ public class SslConnection extends AbstractConnection try { wrapResult = _sslEngine.wrap(appOuts, _encryptedOutput); - if (LOG.isDebugEnabled()) - LOG.debug("wrap {} {}", wrapResult.toString().replace('\n',' '), BufferUtil.toHexSummary(_encryptedOutput)); } finally { BufferUtil.flipToFlush(_encryptedOutput, pos); } - + if (LOG.isDebugEnabled()) + LOG.debug("wrap {} {} ioDone={}/{}", + wrapResult.toString().replace('\n', ' '), + BufferUtil.toSummaryString(_encryptedOutput), + _sslEngine.isInboundDone(), + _sslEngine.isOutboundDone()); + // Was all the data consumed? - boolean allConsumed=true; + boolean allConsumed = true; for (ByteBuffer b : appOuts) if (BufferUtil.hasContent(b)) - allConsumed=false; + allConsumed = false; // if we have net bytes, let's try to flush them boolean flushed = true; @@ -899,12 +908,12 @@ public class SslConnection extends AbstractConnection return result = true; throw new IOException("Broken pipe"); } - + case BUFFER_OVERFLOW: if (!flushed) return result = false; continue; - + case OK: if (wrapResult.getHandshakeStatus() == HandshakeStatus.FINISHED) handshakeSucceeded(); @@ -960,24 +969,24 @@ public class SslConnection extends AbstractConnection { boolean fillInterest = false; ByteBuffer write = null; - synchronized(_decryptedEndPoint) + synchronized (_decryptedEndPoint) { if (LOG.isDebugEnabled()) LOG.debug(">onIncompleteFlush {} {}", SslConnection.this, BufferUtil.toDetailString(_encryptedOutput)); - if (_flushState!=FlushState.IDLE) + if (_flushState != FlushState.IDLE) return; - while(true) + while (true) { HandshakeStatus status = _sslEngine.getHandshakeStatus(); - switch(status) + switch (status) { case NEED_TASK: case NEED_WRAP: case NOT_HANDSHAKING: // write what we have or an empty buffer to reschedule a call to flush - write = BufferUtil.hasContent(_encryptedOutput)?_encryptedOutput:BufferUtil.EMPTY_BUFFER; + write = BufferUtil.hasContent(_encryptedOutput) ? _encryptedOutput : BufferUtil.EMPTY_BUFFER; _flushState = FlushState.WRITING; break; @@ -990,7 +999,7 @@ public class SslConnection extends AbstractConnection break; } - if (_fillState!=FillState.IDLE) + if (_fillState != FillState.IDLE) { // Wait for a fill that is happening anyway _flushState = FlushState.WAIT_FOR_FILL; @@ -1002,12 +1011,12 @@ public class SslConnection extends AbstractConnection { int filled = fill(BufferUtil.EMPTY_BUFFER); // If this changed the status, let's try again - if (_sslEngine.getHandshakeStatus()!=status) + if (_sslEngine.getHandshakeStatus() != status) continue; if (filled < 0) throw new IOException("Broken pipe"); } - catch(IOException e) + catch (IOException e) { LOG.debug(e); close(e); @@ -1032,7 +1041,7 @@ public class SslConnection extends AbstractConnection LOG.debug(" + + getExecutor().execute(() -> { if (fail_fill_interest) _decryptedEndPoint.getFillInterest().onFail(x); _decryptedEndPoint.getWriteFlusher().onFail(x); }); } - + @Override public InvocationType getInvocationType() { return _decryptedEndPoint.getWriteFlusher().getCallbackInvocationType(); } - + @Override public String toString() { - return String.format("SSL@%h.DEP.writeCallback",SslConnection.this); + return String.format("SSL@%h.DEP.writeCallback", SslConnection.this); } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 6eaa6c04e07..97583b4d33d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -263,14 +263,26 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (suspended || getEndPoint().getConnection() != this) break; } - else + else if (filled==0) { - if (filled <= 0) + fillInterested(); + break; + } + else if (filled<0) + { + switch(_channel.getState().getState()) { - if (filled == 0) - fillInterested(); - break; + case COMPLETING: + case COMPLETED: + case IDLE: + case THROWN: + case ASYNC_ERROR: + getEndPoint().shutdownOutput(); + break; + default: + break; } + break; } } } @@ -310,16 +322,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (BufferUtil.isEmpty(_requestBuffer)) { - // Can we fill? - if(getEndPoint().isInputShutdown()) - { - // No pretend we read -1 - _parser.atEOF(); - if (LOG.isDebugEnabled()) - LOG.debug("{} filled -1 {}",this,BufferUtil.toDetailString(_requestBuffer)); - return -1; - } - // Get a buffer // We are not in a race here for the request buffer as we have not yet received a request, // so there are not an possible legal threads calling #parseContent or #completed. @@ -411,13 +413,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (_input.isAsync()) { if (LOG.isDebugEnabled()) - LOG.debug("unconsumed async input {}", this); + LOG.debug("{}unconsumed input {}",_parser.isChunking()?"Possible ":"", this); _channel.abort(new IOException("unconsumed input")); } else { if (LOG.isDebugEnabled()) - LOG.debug("unconsumed input {}", this); + LOG.debug("{}unconsumed input {}",_parser.isChunking()?"Possible ":"", this); // Complete reading the request if (!_input.consumeAll()) _channel.abort(new IOException("unconsumed input")); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index 49021ba059f..45c11c43b9a 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.server; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.startsWith; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.EOFException; @@ -58,9 +48,20 @@ import org.eclipse.jetty.util.log.AbstractLogger; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.junit.jupiter.api.condition.DisabledOnJre; +import org.junit.jupiter.api.condition.JRE; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public abstract class HttpServerTestBase extends HttpServerTestFixture { @@ -1692,4 +1693,89 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture client.close(); } } + + @Test + @DisabledOnJre({JRE.JAVA_8, JRE.JAVA_9, JRE.JAVA_10}) + public void testShutdown() throws Exception + { + configureServer(new ReadExactHandler()); + byte[] content = new byte[4096]; + Arrays.fill(content,(byte)'X'); + + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + + // Send two persistent pipelined requests and then shutdown output + os.write(("GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Length: "+content.length+"\r\n" + + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); + os.write(content); + os.write(("GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Length: "+content.length+"\r\n" + + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); + os.write(content); + os.flush(); + // Thread.sleep(50); + client.shutdownOutput(); + + // Read the two pipelined responses + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("Read "+content.length)); + + response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("Read "+content.length)); + + // Read the close + assertThat(client.getInputStream().read(),is(-1)); + } + } + + @Test + @DisabledOnJre({JRE.JAVA_8, JRE.JAVA_9, JRE.JAVA_10}) + public void testChunkedShutdown() throws Exception + { + configureServer(new ReadExactHandler(4096)); + byte[] content = new byte[4096]; + Arrays.fill(content,(byte)'X'); + + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + + // Send two persistent pipelined requests and then shutdown output + os.write(("GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Transfer-Encoding: chunked\r\n" + + "\r\n" + + "1000\r\n").getBytes(StandardCharsets.ISO_8859_1)); + os.write(content); + os.write("\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.write(("GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Transfer-Encoding: chunked\r\n" + + "\r\n" + + "1000\r\n").getBytes(StandardCharsets.ISO_8859_1)); + os.write(content); + os.write("\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + client.shutdownOutput(); + + // Read the two pipelined responses + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("Read "+content.length)); + + response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("Read "+content.length)); + + // Read the close + assertThat(client.getInputStream().read(),is(-1)); + } + } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java index b93d761f8e6..c86691db5b2 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java @@ -27,6 +27,7 @@ import java.net.Socket; import java.net.URI; import java.nio.charset.StandardCharsets; +import javax.servlet.RequestDispatcher; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -185,6 +186,54 @@ public class HttpServerTestFixture response.getOutputStream().print("Hello world\r\n"); } } + + + protected static class ReadExactHandler extends AbstractHandler.ErrorDispatchHandler + { + private int expected; + + public ReadExactHandler() + { + this(-1); + } + + public ReadExactHandler(int expected) + { + this.expected = expected; + } + + @Override + public void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + int len = expected<0?request.getContentLength():expected; + if (len<0) + throw new IllegalStateException(); + byte[] content = new byte[len]; + int offset = 0; + while (offset 0) + { + int read = input.read(); + if (read < 0) + throw new IllegalStateException(); + --length; + } + + // Second: write the response. + response.setContentLength(bytes.length); + response.getOutputStream().write(bytes); + response.flushBuffer(); + + sleep(idleTimeout / 2); + + // Third, read the EOF. + int read = input.read(); + if (read >= 0) + throw new IllegalStateException(); + } + }); + server.start(); + + try + { + SSLContext sslContext = sslContextFactory.getSslContext(); + try (Socket client = sslContext.getSocketFactory().createSocket("localhost", connector.getLocalPort())) + { + client.setSoTimeout(5 * idleTimeout); + + OutputStream output = client.getOutputStream(); + String request = "" + + "POST / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Length: " + content.length() + "\r\n" + + "\r\n"; + output.write(request.getBytes(StandardCharsets.UTF_8)); + output.write(bytes); + output.flush(); + + // Read the response. + InputStream input = client.getInputStream(); + int crlfs = 0; + while (true) + { + int read = input.read(); + assertThat(read, Matchers.greaterThanOrEqualTo(0)); + if (read == '\r' || read == '\n') + ++crlfs; + else + crlfs = 0; + if (crlfs == 4) + break; + } + for (byte b : bytes) + assertEquals(b, input.read()); + + + // Shutdown the output so the server reads the TLS close_notify. + client.shutdownOutput(); + // client.close(); + + // The connection should now be idle timed out by the server. + int read = input.read(); + assertEquals(-1, read); + } + } + finally + { + server.stop(); + } + } + + private void sleep(long time) throws IOException + { + try + { + Thread.sleep(time); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } +} diff --git a/jetty-server/src/test/resources/jetty-logging.properties b/jetty-server/src/test/resources/jetty-logging.properties index 581f7a2bbda..3f8368fa054 100644 --- a/jetty-server/src/test/resources/jetty-logging.properties +++ b/jetty-server/src/test/resources/jetty-logging.properties @@ -1,5 +1,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog #org.eclipse.jetty.LEVEL=DEBUG -#org.eclipse.jetty.io.ssl.LEVEL=DEBUG #org.eclipse.jetty.server.ConnectionLimit.LEVEL=DEBUG #org.eclipse.jetty.server.AcceptRateLimit.LEVEL=DEBUG \ No newline at end of file