diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index dcdd99b1e4b..8336b244334 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import java.util.concurrent.atomic.LongAdder; import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; @@ -38,22 +39,19 @@ public class HttpInput extends ServletInputStream implements Runnable private final byte[] _oneByteBuffer = new byte[1]; private final BlockingContentProducer _blockingContentProducer; private final AsyncContentProducer _asyncContentProducer; + private final AutoLock _contentProducerLock = new AutoLock(); private final HttpChannelState _channelState; - private ContentProducer _contentProducer; - private boolean _consumedEof; - private ReadListener _readListener; - private long _contentConsumed; - private final AutoLock _lock = new AutoLock(); + private final LongAdder _contentConsumed = new LongAdder(); + private volatile ContentProducer _contentProducer; + private volatile boolean _consumedEof; + private volatile ReadListener _readListener; public HttpInput(HttpChannelState state) { - try (AutoLock lock = _lock.lock()) - { - _channelState = state; - _asyncContentProducer = new AsyncContentProducer(state.getHttpChannel()); - _blockingContentProducer = new BlockingContentProducer(_asyncContentProducer); - _contentProducer = _blockingContentProducer; - } + _channelState = state; + _asyncContentProducer = new AsyncContentProducer(state.getHttpChannel()); + _blockingContentProducer = new BlockingContentProducer(_asyncContentProducer); + _contentProducer = _blockingContentProducer; } public void recycle() @@ -64,16 +62,13 @@ public class HttpInput extends ServletInputStream implements Runnable public void reopen() { - try (AutoLock lock = _lock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("reopen {}", this); - _blockingContentProducer.recycle(); - _contentProducer = _blockingContentProducer; - _consumedEof = false; - _readListener = null; - _contentConsumed = 0; - } + if (LOG.isDebugEnabled()) + LOG.debug("reopen {}", this); + _blockingContentProducer.recycle(); + _contentProducer = _blockingContentProducer; + _consumedEof = false; + _readListener = null; + _contentConsumed.reset(); } /** @@ -81,7 +76,7 @@ public class HttpInput extends ServletInputStream implements Runnable */ public Interceptor getInterceptor() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { return _contentProducer.getInterceptor(); } @@ -94,7 +89,7 @@ public class HttpInput extends ServletInputStream implements Runnable */ public void setInterceptor(Interceptor interceptor) { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { if (LOG.isDebugEnabled()) LOG.debug("setting interceptor to {} on {}", interceptor, this); @@ -110,7 +105,7 @@ public class HttpInput extends ServletInputStream implements Runnable */ public void addInterceptor(Interceptor interceptor) { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { Interceptor currentInterceptor = _contentProducer.getInterceptor(); if (currentInterceptor == null) @@ -132,21 +127,18 @@ public class HttpInput extends ServletInputStream implements Runnable private int get(Content content, byte[] bytes, int offset, int length) { int consumed = content.get(bytes, offset, length); - _contentConsumed += consumed; + _contentConsumed.add(consumed); return consumed; } public long getContentConsumed() { - try (AutoLock lock = _lock.lock()) - { - return _contentConsumed; - } + return _contentConsumed.sum(); } public long getContentReceived() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { return _contentProducer.getRawContentArrived(); } @@ -154,7 +146,7 @@ public class HttpInput extends ServletInputStream implements Runnable public boolean consumeAll() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { IOException failure = new IOException("Unconsumed content"); if (LOG.isDebugEnabled()) @@ -172,7 +164,7 @@ public class HttpInput extends ServletInputStream implements Runnable public boolean isError() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { boolean error = _contentProducer.isError(); if (LOG.isDebugEnabled()) @@ -183,12 +175,9 @@ public class HttpInput extends ServletInputStream implements Runnable public boolean isAsync() { - try (AutoLock lock = _lock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("isAsync read listener {} {}", _readListener, this); - return _readListener != null; - } + if (LOG.isDebugEnabled()) + LOG.debug("isAsync read listener {} {}", _readListener, this); + return _readListener != null; } /* ServletInputStream */ @@ -196,19 +185,16 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public boolean isFinished() { - try (AutoLock lock = _lock.lock()) - { - boolean finished = _consumedEof; - if (LOG.isDebugEnabled()) - LOG.debug("isFinished={} {}", finished, this); - return finished; - } + boolean finished = _consumedEof; + if (LOG.isDebugEnabled()) + LOG.debug("isFinished={} {}", finished, this); + return finished; } @Override public boolean isReady() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { boolean ready = _contentProducer.isReady(); if (LOG.isDebugEnabled()) @@ -220,36 +206,32 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public void setReadListener(ReadListener readListener) { - try (AutoLock lock = _lock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("setting read listener to {} {}", readListener, this); - if (_readListener != null) - throw new IllegalStateException("ReadListener already set"); - _readListener = Objects.requireNonNull(readListener); - //illegal if async not started - if (!_channelState.isAsyncStarted()) - throw new IllegalStateException("Async not started"); + if (LOG.isDebugEnabled()) + LOG.debug("setting read listener to {} {}", readListener, this); + if (_readListener != null) + throw new IllegalStateException("ReadListener already set"); + _readListener = Objects.requireNonNull(readListener); + //illegal if async not started + if (!_channelState.isAsyncStarted()) + throw new IllegalStateException("Async not started"); - _contentProducer = _asyncContentProducer; - // trigger content production - if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN - scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead - } + _contentProducer = _asyncContentProducer; + // trigger content production + if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN + scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead } public boolean onContentProducible() { - try (AutoLock lock = _lock.lock()) - { - return _contentProducer.onContentProducible(); - } + // This is the only _contentProducer method call that must not + // happen under a lock as it is used to release blocking calls. + return _contentProducer.onContentProducible(); } @Override public int read() throws IOException { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { int read = read(_oneByteBuffer, 0, 1); if (read == 0) @@ -261,7 +243,7 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public int read(byte[] b, int off, int len) throws IOException { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { // Calculate minimum request rate for DoS protection _contentProducer.checkMinDataRate(); @@ -317,7 +299,7 @@ public class HttpInput extends ServletInputStream implements Runnable */ public boolean hasContent() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { // Do not call _contentProducer.available() as it calls HttpChannel.produceContent() // which is forbidden by this method's contract. @@ -331,7 +313,7 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public int available() { - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { int available = _contentProducer.available(); if (LOG.isDebugEnabled()) @@ -350,8 +332,7 @@ public class HttpInput extends ServletInputStream implements Runnable public void run() { Content content; - ReadListener readListener; - try (AutoLock lock = _lock.lock()) + try (AutoLock lock = _contentProducerLock.lock()) { // Call isReady() to make sure that if not ready we register for fill interest. if (!_contentProducer.isReady()) @@ -363,12 +344,10 @@ public class HttpInput extends ServletInputStream implements Runnable content = _contentProducer.nextContent(); if (LOG.isDebugEnabled()) LOG.debug("running on content {} {}", content, this); - - readListener = this._readListener; } // This check is needed when a request is started async but no read listener is registered. - if (readListener == null) + if (_readListener == null) { if (LOG.isDebugEnabled()) LOG.debug("running without a read listener {}", this); @@ -385,7 +364,7 @@ public class HttpInput extends ServletInputStream implements Runnable LOG.debug("running error={} {}", error, this); // TODO is this necessary to add here? _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - readListener.onError(error); + _readListener.onError(error); } else if (content.isEof()) { @@ -393,13 +372,13 @@ public class HttpInput extends ServletInputStream implements Runnable { if (LOG.isDebugEnabled()) LOG.debug("running at EOF {}", this); - readListener.onAllDataRead(); + _readListener.onAllDataRead(); } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("running failed onAllDataRead {}", this, x); - readListener.onError(x); + _readListener.onError(x); } } } @@ -409,13 +388,13 @@ public class HttpInput extends ServletInputStream implements Runnable LOG.debug("running has content {}", this); try { - readListener.onDataAvailable(); + _readListener.onDataAvailable(); } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("running failed onDataAvailable {}", this, x); - readListener.onError(x); + _readListener.onError(x); } } }