lock all HttpInput methods to prevents concurrent threads to work on the same ByteBuffer

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-02-10 18:03:10 +01:00
parent f8bf885686
commit 6d9d5484a7
1 changed files with 190 additions and 127 deletions

View File

@ -23,6 +23,7 @@ import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Destroyable; import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -42,13 +43,17 @@ public class HttpInput extends ServletInputStream implements Runnable
private boolean _consumedEof; private boolean _consumedEof;
private ReadListener _readListener; private ReadListener _readListener;
private long _contentConsumed; private long _contentConsumed;
private final AutoLock _lock = new AutoLock();
public HttpInput(HttpChannelState state) public HttpInput(HttpChannelState state)
{ {
_channelState = state; try (AutoLock lock = _lock.lock())
_asyncContentProducer = new AsyncContentProducer(state.getHttpChannel()); {
_blockingContentProducer = new BlockingContentProducer(_asyncContentProducer); _channelState = state;
_contentProducer = _blockingContentProducer; _asyncContentProducer = new AsyncContentProducer(state.getHttpChannel());
_blockingContentProducer = new BlockingContentProducer(_asyncContentProducer);
_contentProducer = _blockingContentProducer;
}
} }
public void recycle() public void recycle()
@ -59,13 +64,16 @@ public class HttpInput extends ServletInputStream implements Runnable
public void reopen() public void reopen()
{ {
if (LOG.isDebugEnabled()) try (AutoLock lock = _lock.lock())
LOG.debug("reopen {}", this); {
_blockingContentProducer.recycle(); if (LOG.isDebugEnabled())
_contentProducer = _blockingContentProducer; LOG.debug("reopen {}", this);
_consumedEof = false; _blockingContentProducer.recycle();
_readListener = null; _contentProducer = _blockingContentProducer;
_contentConsumed = 0; _consumedEof = false;
_readListener = null;
_contentConsumed = 0;
}
} }
/** /**
@ -73,7 +81,10 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public Interceptor getInterceptor() public Interceptor getInterceptor()
{ {
return _contentProducer.getInterceptor(); try (AutoLock lock = _lock.lock())
{
return _contentProducer.getInterceptor();
}
} }
/** /**
@ -83,9 +94,12 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public void setInterceptor(Interceptor interceptor) public void setInterceptor(Interceptor interceptor)
{ {
if (LOG.isDebugEnabled()) try (AutoLock lock = _lock.lock())
LOG.debug("setting interceptor to {} on {}", interceptor, this); {
_contentProducer.setInterceptor(interceptor); if (LOG.isDebugEnabled())
LOG.debug("setting interceptor to {} on {}", interceptor, this);
_contentProducer.setInterceptor(interceptor);
}
} }
/** /**
@ -96,23 +110,26 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public void addInterceptor(Interceptor interceptor) public void addInterceptor(Interceptor interceptor)
{ {
Interceptor currentInterceptor = _contentProducer.getInterceptor(); try (AutoLock lock = _lock.lock())
if (currentInterceptor == null)
{ {
if (LOG.isDebugEnabled()) Interceptor currentInterceptor = _contentProducer.getInterceptor();
LOG.debug("adding single interceptor: {} on {}", interceptor, this); if (currentInterceptor == null)
_contentProducer.setInterceptor(interceptor); {
} if (LOG.isDebugEnabled())
else LOG.debug("adding single interceptor: {} on {}", interceptor, this);
{ _contentProducer.setInterceptor(interceptor);
ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); }
if (LOG.isDebugEnabled()) else
LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this); {
_contentProducer.setInterceptor(chainedInterceptor); ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor);
if (LOG.isDebugEnabled())
LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this);
_contentProducer.setInterceptor(chainedInterceptor);
}
} }
} }
public int get(Content content, byte[] bytes, int offset, int length) private int get(Content content, byte[] bytes, int offset, int length)
{ {
int consumed = content.get(bytes, offset, length); int consumed = content.get(bytes, offset, length);
_contentConsumed += consumed; _contentConsumed += consumed;
@ -121,42 +138,57 @@ public class HttpInput extends ServletInputStream implements Runnable
public long getContentConsumed() public long getContentConsumed()
{ {
return _contentConsumed; try (AutoLock lock = _lock.lock())
{
return _contentConsumed;
}
} }
public long getContentReceived() public long getContentReceived()
{ {
return _contentProducer.getRawContentArrived(); try (AutoLock lock = _lock.lock())
{
return _contentProducer.getRawContentArrived();
}
} }
public boolean consumeAll() public boolean consumeAll()
{ {
IOException failure = new IOException("Unconsumed content"); try (AutoLock lock = _lock.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("consumeAll {}", this, failure); IOException failure = new IOException("Unconsumed content");
boolean atEof = _contentProducer.consumeAll(failure); if (LOG.isDebugEnabled())
if (atEof) LOG.debug("consumeAll {}", this, failure);
_consumedEof = true; boolean atEof = _contentProducer.consumeAll(failure);
if (atEof)
_consumedEof = true;
if (isFinished()) if (isFinished())
return !isError(); return !isError();
return false; return false;
}
} }
public boolean isError() public boolean isError()
{ {
boolean error = _contentProducer.isError(); try (AutoLock lock = _lock.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("isError={} {}", error, this); boolean error = _contentProducer.isError();
return error; if (LOG.isDebugEnabled())
LOG.debug("isError={} {}", error, this);
return error;
}
} }
public boolean isAsync() public boolean isAsync()
{ {
if (LOG.isDebugEnabled()) try (AutoLock lock = _lock.lock())
LOG.debug("isAsync read listener {} {}", _readListener, this); {
return _readListener != null; if (LOG.isDebugEnabled())
LOG.debug("isAsync read listener {} {}", _readListener, this);
return _readListener != null;
}
} }
/* ServletInputStream */ /* ServletInputStream */
@ -164,94 +196,112 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public boolean isFinished() public boolean isFinished()
{ {
boolean finished = _consumedEof; try (AutoLock lock = _lock.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("isFinished={} {}", finished, this); boolean finished = _consumedEof;
return finished; if (LOG.isDebugEnabled())
LOG.debug("isFinished={} {}", finished, this);
return finished;
}
} }
@Override @Override
public boolean isReady() public boolean isReady()
{ {
boolean ready = _contentProducer.isReady(); try (AutoLock lock = _lock.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("isReady={} {}", ready, this); boolean ready = _contentProducer.isReady();
return ready; if (LOG.isDebugEnabled())
LOG.debug("isReady={} {}", ready, this);
return ready;
}
} }
@Override @Override
public void setReadListener(ReadListener readListener) public void setReadListener(ReadListener readListener)
{ {
if (LOG.isDebugEnabled()) try (AutoLock lock = _lock.lock())
LOG.debug("setting read listener to {} {}", readListener, this); {
if (_readListener != null) if (LOG.isDebugEnabled())
throw new IllegalStateException("ReadListener already set"); LOG.debug("setting read listener to {} {}", readListener, this);
_readListener = Objects.requireNonNull(readListener); if (_readListener != null)
//illegal if async not started throw new IllegalStateException("ReadListener already set");
if (!_channelState.isAsyncStarted()) _readListener = Objects.requireNonNull(readListener);
throw new IllegalStateException("Async not started"); //illegal if async not started
if (!_channelState.isAsyncStarted())
throw new IllegalStateException("Async not started");
_contentProducer = _asyncContentProducer; _contentProducer = _asyncContentProducer;
// trigger content production // trigger content production
if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN
scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead
}
} }
public boolean onContentProducible() public boolean onContentProducible()
{ {
return _contentProducer.onContentProducible(); try (AutoLock lock = _lock.lock())
{
return _contentProducer.onContentProducible();
}
} }
@Override @Override
public int read() throws IOException public int read() throws IOException
{ {
int read = read(_oneByteBuffer, 0, 1); try (AutoLock lock = _lock.lock())
if (read == 0) {
throw new IOException("unready read=0"); int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException public int read(byte[] b, int off, int len) throws IOException
{ {
// Calculate minimum request rate for DoS protection try (AutoLock lock = _lock.lock())
_contentProducer.checkMinDataRate();
Content content = _contentProducer.nextContent();
if (content == null)
throw new IllegalStateException("read on unready input");
if (!content.isSpecial())
{ {
int read = get(content, b, off, len); // Calculate minimum request rate for DoS protection
_contentProducer.checkMinDataRate();
Content content = _contentProducer.nextContent();
if (content == null)
throw new IllegalStateException("read on unready input");
if (!content.isSpecial())
{
int read = get(content, b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s) {}", read, this);
if (content.isEmpty())
_contentProducer.reclaim(content);
return read;
}
Throwable error = content.getError();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s) {}", read, this); LOG.debug("read error={} {}", error, this);
if (content.isEmpty()) if (error != null)
_contentProducer.reclaim(content); {
return read; if (error instanceof IOException)
} throw (IOException)error;
throw new IOException(error);
}
Throwable error = content.getError(); if (content.isEof())
if (LOG.isDebugEnabled()) {
LOG.debug("read error={} {}", error, this); if (LOG.isDebugEnabled())
if (error != null) LOG.debug("read at EOF, setting consumed EOF to true {}", this);
{ _consumedEof = true;
if (error instanceof IOException) // If EOF do we need to wake for allDataRead callback?
throw (IOException)error; if (onContentProducible())
throw new IOException(error); scheduleReadListenerNotification();
} return -1;
}
if (content.isEof()) throw new AssertionError("no data, no error and not EOF");
{
if (LOG.isDebugEnabled())
LOG.debug("read at EOF, setting consumed EOF to true {}", this);
_consumedEof = true;
// If EOF do we need to wake for allDataRead callback?
if (onContentProducible())
scheduleReadListenerNotification();
return -1;
} }
throw new AssertionError("no data, no error and not EOF");
} }
private void scheduleReadListenerNotification() private void scheduleReadListenerNotification()
@ -267,21 +317,27 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public boolean hasContent() public boolean hasContent()
{ {
// Do not call _contentProducer.available() as it calls HttpChannel.produceContent() try (AutoLock lock = _lock.lock())
// which is forbidden by this method's contract. {
boolean hasContent = _contentProducer.hasContent(); // Do not call _contentProducer.available() as it calls HttpChannel.produceContent()
if (LOG.isDebugEnabled()) // which is forbidden by this method's contract.
LOG.debug("hasContent={} {}", hasContent, this); boolean hasContent = _contentProducer.hasContent();
return hasContent; if (LOG.isDebugEnabled())
LOG.debug("hasContent={} {}", hasContent, this);
return hasContent;
}
} }
@Override @Override
public int available() public int available()
{ {
int available = _contentProducer.available(); try (AutoLock lock = _lock.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("available={} {}", available, this); int available = _contentProducer.available();
return available; if (LOG.isDebugEnabled())
LOG.debug("available={} {}", available, this);
return available;
}
} }
/* Runnable */ /* Runnable */
@ -293,19 +349,26 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public void run() public void run()
{ {
// Call isReady() to make sure that if not ready we register for fill interest. Content content;
if (!_contentProducer.isReady()) ReadListener readListener;
try (AutoLock lock = _lock.lock())
{ {
// Call isReady() to make sure that if not ready we register for fill interest.
if (!_contentProducer.isReady())
{
if (LOG.isDebugEnabled())
LOG.debug("running but not ready {}", this);
return;
}
content = _contentProducer.nextContent();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("running but not ready {}", this); LOG.debug("running on content {} {}", content, this);
return;
readListener = this._readListener;
} }
Content content = _contentProducer.nextContent();
if (LOG.isDebugEnabled())
LOG.debug("running on content {} {}", content, this);
// This check is needed when a request is started async but no read listener is registered. // This check is needed when a request is started async but no read listener is registered.
if (_readListener == null) if (readListener == null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("running without a read listener {}", this); LOG.debug("running without a read listener {}", this);
@ -322,7 +385,7 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug("running error={} {}", error, this); LOG.debug("running error={} {}", error, this);
// TODO is this necessary to add here? // TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
_readListener.onError(error); readListener.onError(error);
} }
else if (content.isEof()) else if (content.isEof())
{ {
@ -330,13 +393,13 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("running at EOF {}", this); LOG.debug("running at EOF {}", this);
_readListener.onAllDataRead(); readListener.onAllDataRead();
} }
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead {}", this, x); LOG.debug("running failed onAllDataRead {}", this, x);
_readListener.onError(x); readListener.onError(x);
} }
} }
} }
@ -346,13 +409,13 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug("running has content {}", this); LOG.debug("running has content {}", this);
try try
{ {
_readListener.onDataAvailable(); readListener.onDataAvailable();
} }
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable {}", this, x); LOG.debug("running failed onDataAvailable {}", this, x);
_readListener.onError(x); readListener.onError(x);
} }
} }
} }