rework HttpInput locking

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-02-11 10:23:18 +01:00
parent 6d9d5484a7
commit 1494a05327
1 changed files with 59 additions and 80 deletions

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
@ -38,22 +39,19 @@ public class HttpInput extends ServletInputStream implements Runnable
private final byte[] _oneByteBuffer = new byte[1];
private final BlockingContentProducer _blockingContentProducer;
private final AsyncContentProducer _asyncContentProducer;
private final AutoLock _contentProducerLock = new AutoLock();
private final HttpChannelState _channelState;
private ContentProducer _contentProducer;
private boolean _consumedEof;
private ReadListener _readListener;
private long _contentConsumed;
private final AutoLock _lock = new AutoLock();
private final LongAdder _contentConsumed = new LongAdder();
private volatile ContentProducer _contentProducer;
private volatile boolean _consumedEof;
private volatile ReadListener _readListener;
public HttpInput(HttpChannelState state)
{
try (AutoLock lock = _lock.lock())
{
_channelState = state;
_asyncContentProducer = new AsyncContentProducer(state.getHttpChannel());
_blockingContentProducer = new BlockingContentProducer(_asyncContentProducer);
_contentProducer = _blockingContentProducer;
}
_channelState = state;
_asyncContentProducer = new AsyncContentProducer(state.getHttpChannel());
_blockingContentProducer = new BlockingContentProducer(_asyncContentProducer);
_contentProducer = _blockingContentProducer;
}
public void recycle()
@ -64,16 +62,13 @@ public class HttpInput extends ServletInputStream implements Runnable
public void reopen()
{
try (AutoLock lock = _lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("reopen {}", this);
_blockingContentProducer.recycle();
_contentProducer = _blockingContentProducer;
_consumedEof = false;
_readListener = null;
_contentConsumed = 0;
}
if (LOG.isDebugEnabled())
LOG.debug("reopen {}", this);
_blockingContentProducer.recycle();
_contentProducer = _blockingContentProducer;
_consumedEof = false;
_readListener = null;
_contentConsumed.reset();
}
/**
@ -81,7 +76,7 @@ public class HttpInput extends ServletInputStream implements Runnable
*/
public Interceptor getInterceptor()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
return _contentProducer.getInterceptor();
}
@ -94,7 +89,7 @@ public class HttpInput extends ServletInputStream implements Runnable
*/
public void setInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("setting interceptor to {} on {}", interceptor, this);
@ -110,7 +105,7 @@ public class HttpInput extends ServletInputStream implements Runnable
*/
public void addInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
Interceptor currentInterceptor = _contentProducer.getInterceptor();
if (currentInterceptor == null)
@ -132,21 +127,18 @@ public class HttpInput extends ServletInputStream implements Runnable
private int get(Content content, byte[] bytes, int offset, int length)
{
int consumed = content.get(bytes, offset, length);
_contentConsumed += consumed;
_contentConsumed.add(consumed);
return consumed;
}
public long getContentConsumed()
{
try (AutoLock lock = _lock.lock())
{
return _contentConsumed;
}
return _contentConsumed.sum();
}
public long getContentReceived()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
return _contentProducer.getRawContentArrived();
}
@ -154,7 +146,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean consumeAll()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
IOException failure = new IOException("Unconsumed content");
if (LOG.isDebugEnabled())
@ -172,7 +164,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean isError()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
boolean error = _contentProducer.isError();
if (LOG.isDebugEnabled())
@ -183,12 +175,9 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean isAsync()
{
try (AutoLock lock = _lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("isAsync read listener {} {}", _readListener, this);
return _readListener != null;
}
if (LOG.isDebugEnabled())
LOG.debug("isAsync read listener {} {}", _readListener, this);
return _readListener != null;
}
/* ServletInputStream */
@ -196,19 +185,16 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public boolean isFinished()
{
try (AutoLock lock = _lock.lock())
{
boolean finished = _consumedEof;
if (LOG.isDebugEnabled())
LOG.debug("isFinished={} {}", finished, this);
return finished;
}
boolean finished = _consumedEof;
if (LOG.isDebugEnabled())
LOG.debug("isFinished={} {}", finished, this);
return finished;
}
@Override
public boolean isReady()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
boolean ready = _contentProducer.isReady();
if (LOG.isDebugEnabled())
@ -220,36 +206,32 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public void setReadListener(ReadListener readListener)
{
try (AutoLock lock = _lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("setting read listener to {} {}", readListener, this);
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
//illegal if async not started
if (!_channelState.isAsyncStarted())
throw new IllegalStateException("Async not started");
if (LOG.isDebugEnabled())
LOG.debug("setting read listener to {} {}", readListener, this);
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
//illegal if async not started
if (!_channelState.isAsyncStarted())
throw new IllegalStateException("Async not started");
_contentProducer = _asyncContentProducer;
// trigger content production
if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN
scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead
}
_contentProducer = _asyncContentProducer;
// trigger content production
if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN
scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead
}
public boolean onContentProducible()
{
try (AutoLock lock = _lock.lock())
{
return _contentProducer.onContentProducible();
}
// This is the only _contentProducer method call that must not
// happen under a lock as it is used to release blocking calls.
return _contentProducer.onContentProducible();
}
@Override
public int read() throws IOException
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
@ -261,7 +243,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int read(byte[] b, int off, int len) throws IOException
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
// Calculate minimum request rate for DoS protection
_contentProducer.checkMinDataRate();
@ -317,7 +299,7 @@ public class HttpInput extends ServletInputStream implements Runnable
*/
public boolean hasContent()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
// Do not call _contentProducer.available() as it calls HttpChannel.produceContent()
// which is forbidden by this method's contract.
@ -331,7 +313,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int available()
{
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
@ -350,8 +332,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public void run()
{
Content content;
ReadListener readListener;
try (AutoLock lock = _lock.lock())
try (AutoLock lock = _contentProducerLock.lock())
{
// Call isReady() to make sure that if not ready we register for fill interest.
if (!_contentProducer.isReady())
@ -363,12 +344,10 @@ public class HttpInput extends ServletInputStream implements Runnable
content = _contentProducer.nextContent();
if (LOG.isDebugEnabled())
LOG.debug("running on content {} {}", content, this);
readListener = this._readListener;
}
// This check is needed when a request is started async but no read listener is registered.
if (readListener == null)
if (_readListener == null)
{
if (LOG.isDebugEnabled())
LOG.debug("running without a read listener {}", this);
@ -385,7 +364,7 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug("running error={} {}", error, this);
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
readListener.onError(error);
_readListener.onError(error);
}
else if (content.isEof())
{
@ -393,13 +372,13 @@ public class HttpInput extends ServletInputStream implements Runnable
{
if (LOG.isDebugEnabled())
LOG.debug("running at EOF {}", this);
readListener.onAllDataRead();
_readListener.onAllDataRead();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead {}", this, x);
readListener.onError(x);
_readListener.onError(x);
}
}
}
@ -409,13 +388,13 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug("running has content {}", this);
try
{
readListener.onDataAvailable();
_readListener.onDataAvailable();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable {}", this, x);
readListener.onError(x);
_readListener.onError(x);
}
}
}