internalize the lock within the content producer
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
59b397b1a0
commit
31eedec1ed
|
@ -22,13 +22,14 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Non-blocking {@link ContentProducer} implementation. Calling {@link ContentProducer#nextContent(AutoLock)} will never block
|
||||
* Non-blocking {@link ContentProducer} implementation. Calling {@link ContentProducer#nextContent()} will never block
|
||||
* but will return null when there is no available content.
|
||||
*/
|
||||
class AsyncContentProducer implements ContentProducer
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
|
||||
|
||||
private final AutoLock _lock = new AutoLock();
|
||||
private final HttpChannel _httpChannel;
|
||||
private HttpInput.Interceptor _interceptor;
|
||||
private HttpInput.Content _rawContent;
|
||||
|
@ -42,9 +43,16 @@ class AsyncContentProducer implements ContentProducer
|
|||
_httpChannel = httpChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoLock lock()
|
||||
{
|
||||
return _lock.lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recycle()
|
||||
{
|
||||
assertLocked();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("recycling {}", this);
|
||||
_interceptor = null;
|
||||
|
@ -58,18 +66,21 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public HttpInput.Interceptor getInterceptor()
|
||||
{
|
||||
assertLocked();
|
||||
return _interceptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInterceptor(HttpInput.Interceptor interceptor)
|
||||
{
|
||||
assertLocked();
|
||||
this._interceptor = interceptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available()
|
||||
{
|
||||
assertLocked();
|
||||
HttpInput.Content content = nextTransformedContent();
|
||||
int available = content == null ? 0 : content.remaining();
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -80,6 +91,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public boolean hasContent()
|
||||
{
|
||||
assertLocked();
|
||||
boolean hasContent = _rawContent != null;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("hasContent = {} {}", hasContent, this);
|
||||
|
@ -89,6 +101,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public boolean isError()
|
||||
{
|
||||
assertLocked();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isError = {} {}", _error, this);
|
||||
return _error;
|
||||
|
@ -97,6 +110,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public void checkMinDataRate()
|
||||
{
|
||||
assertLocked();
|
||||
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this);
|
||||
|
@ -128,6 +142,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public long getRawContentArrived()
|
||||
{
|
||||
assertLocked();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this);
|
||||
return _rawContentArrived;
|
||||
|
@ -136,6 +151,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public boolean consumeAll(Throwable x)
|
||||
{
|
||||
assertLocked();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("consumeAll [e={}] {}", x, this);
|
||||
failCurrentContent(x);
|
||||
|
@ -187,17 +203,16 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public boolean onContentProducible()
|
||||
{
|
||||
assertLocked();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onContentProducible {}", this);
|
||||
return _httpChannel.getState().onReadReady();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpInput.Content nextContent(AutoLock lock)
|
||||
public HttpInput.Content nextContent()
|
||||
{
|
||||
if (!lock.isHeldByCurrentThread())
|
||||
throw new IllegalStateException("nextContent must be called with the lock held");
|
||||
|
||||
assertLocked();
|
||||
HttpInput.Content content = nextTransformedContent();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("nextContent = {} {}", content, this);
|
||||
|
@ -209,6 +224,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public void reclaim(HttpInput.Content content)
|
||||
{
|
||||
assertLocked();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("reclaim {} {}", content, this);
|
||||
if (_transformedContent == content)
|
||||
|
@ -223,6 +239,7 @@ class AsyncContentProducer implements ContentProducer
|
|||
@Override
|
||||
public boolean isReady()
|
||||
{
|
||||
assertLocked();
|
||||
HttpInput.Content content = nextTransformedContent();
|
||||
if (content != null)
|
||||
{
|
||||
|
@ -367,6 +384,12 @@ class AsyncContentProducer implements ContentProducer
|
|||
return content;
|
||||
}
|
||||
|
||||
private void assertLocked()
|
||||
{
|
||||
if (!_lock.isHeldByCurrentThread())
|
||||
throw new IllegalStateException("nextContent must be called within a critical block");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Blocking implementation of {@link ContentProducer}. Calling {@link ContentProducer#nextContent(AutoLock)} will block when
|
||||
* Blocking implementation of {@link ContentProducer}. Calling {@link ContentProducer#nextContent()} will block when
|
||||
* there is no available content but will never return null.
|
||||
*/
|
||||
class BlockingContentProducer implements ContentProducer
|
||||
|
@ -35,6 +35,12 @@ class BlockingContentProducer implements ContentProducer
|
|||
_asyncContentProducer = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoLock lock()
|
||||
{
|
||||
return _asyncContentProducer.lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recycle()
|
||||
{
|
||||
|
@ -83,11 +89,11 @@ class BlockingContentProducer implements ContentProducer
|
|||
}
|
||||
|
||||
@Override
|
||||
public HttpInput.Content nextContent(AutoLock lock)
|
||||
public HttpInput.Content nextContent()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
HttpInput.Content content = _asyncContentProducer.nextContent(lock);
|
||||
HttpInput.Content content = _asyncContentProducer.nextContent();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("nextContent async producer returned {}", content);
|
||||
if (content != null)
|
||||
|
@ -104,18 +110,19 @@ class BlockingContentProducer implements ContentProducer
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("nextContent async producer is not ready, waiting on semaphore {}", _semaphore);
|
||||
|
||||
int lockReentranceCount = 0;
|
||||
AutoLock lock = _asyncContentProducer.lock();
|
||||
// Start the counter at -1 b/c acquiring the lock increases the hold count by 1.
|
||||
int lockHoldCount = -1;
|
||||
try
|
||||
{
|
||||
// Do not hold the lock during the wait on the semaphore.
|
||||
// The lock must be unlocked more than once because it is
|
||||
// reentrant so it fan be acquired multiple times by the
|
||||
// same thread, so it can still be held by the thread after
|
||||
// a single unlock call.
|
||||
// Do not hold the lock during the wait on the semaphore;
|
||||
// the lock must be unlocked more than once because it is
|
||||
// reentrant so it can be acquired multiple times by the same
|
||||
// thread, hence it can still be held after a single unlock.
|
||||
while (lock.isHeldByCurrentThread())
|
||||
{
|
||||
lock.close();
|
||||
lockReentranceCount++;
|
||||
lockHoldCount++;
|
||||
}
|
||||
_semaphore.acquire();
|
||||
}
|
||||
|
@ -127,7 +134,7 @@ class BlockingContentProducer implements ContentProducer
|
|||
{
|
||||
// Re-lock the lock as many times as it was held
|
||||
// before the unlock preceding the semaphore acquisition.
|
||||
for (int i = 0; i < lockReentranceCount; i++)
|
||||
for (int i = 0; i < lockHoldCount; i++)
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,13 @@ import org.eclipse.jetty.util.thread.AutoLock;
|
|||
*/
|
||||
public interface ContentProducer
|
||||
{
|
||||
/**
|
||||
* Lock this instance. The lock must be held before any method of this instance's
|
||||
* method be called, and must be manually released afterward.
|
||||
* @return the lock that is guarding this instance.
|
||||
*/
|
||||
AutoLock lock();
|
||||
|
||||
/**
|
||||
* Reset all internal state and clear any held resources.
|
||||
*/
|
||||
|
@ -96,10 +103,8 @@ public interface ContentProducer
|
|||
* After this call, state can be either of UNREADY or IDLE.
|
||||
* @return the next content that can be read from or null if the implementation does not block
|
||||
* and has no available content.
|
||||
* @param lock The lock that is currently held. It will be released if this call has to block for the
|
||||
* duration of the internal blocking.
|
||||
*/
|
||||
HttpInput.Content nextContent(AutoLock lock);
|
||||
HttpInput.Content nextContent();
|
||||
|
||||
/**
|
||||
* Free up the content by calling {@link HttpInput.Content#succeeded()} on it
|
||||
|
@ -109,7 +114,7 @@ public interface ContentProducer
|
|||
|
||||
/**
|
||||
* Check if this {@link ContentProducer} instance has some content that can be read without blocking.
|
||||
* If there is some, the next call to {@link #nextContent(AutoLock)} will not block.
|
||||
* If there is some, the next call to {@link #nextContent()} will not block.
|
||||
* If there isn't any and the implementation does not block, this method will trigger a
|
||||
* {@link javax.servlet.ReadListener} callback once some content is available.
|
||||
* This call is always non-blocking.
|
||||
|
|
|
@ -39,7 +39,6 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
private final byte[] _oneByteBuffer = new byte[1];
|
||||
private final BlockingContentProducer _blockingContentProducer;
|
||||
private final AsyncContentProducer _asyncContentProducer;
|
||||
private final AutoLock _contentProducerLock = new AutoLock();
|
||||
private final HttpChannelState _channelState;
|
||||
private final LongAdder _contentConsumed = new LongAdder();
|
||||
private volatile ContentProducer _contentProducer;
|
||||
|
@ -62,13 +61,16 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
public void reopen()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("reopen {}", this);
|
||||
_blockingContentProducer.recycle();
|
||||
_contentProducer = _blockingContentProducer;
|
||||
_consumedEof = false;
|
||||
_readListener = null;
|
||||
_contentConsumed.reset();
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("reopen {}", this);
|
||||
_blockingContentProducer.recycle();
|
||||
_contentProducer = _blockingContentProducer;
|
||||
_consumedEof = false;
|
||||
_readListener = null;
|
||||
_contentConsumed.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -76,7 +78,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
*/
|
||||
public Interceptor getInterceptor()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
return _contentProducer.getInterceptor();
|
||||
}
|
||||
|
@ -89,7 +91,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
*/
|
||||
public void setInterceptor(Interceptor interceptor)
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("setting interceptor to {} on {}", interceptor, this);
|
||||
|
@ -105,7 +107,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
*/
|
||||
public void addInterceptor(Interceptor interceptor)
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
Interceptor currentInterceptor = _contentProducer.getInterceptor();
|
||||
if (currentInterceptor == null)
|
||||
|
@ -138,7 +140,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
public long getContentReceived()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
return _contentProducer.getRawContentArrived();
|
||||
}
|
||||
|
@ -146,7 +148,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
public boolean consumeAll()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
IOException failure = new IOException("Unconsumed content");
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -164,7 +166,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
public boolean isError()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
boolean error = _contentProducer.isError();
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -194,7 +196,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
@Override
|
||||
public boolean isReady()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
boolean ready = _contentProducer.isReady();
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -223,15 +225,16 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
public boolean onContentProducible()
|
||||
{
|
||||
// This is the only _contentProducer method call that must not
|
||||
// happen under a lock as it is used to release blocking calls.
|
||||
return _contentProducer.onContentProducible();
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
return _contentProducer.onContentProducible();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
int read = read(_oneByteBuffer, 0, 1);
|
||||
if (read == 0)
|
||||
|
@ -243,12 +246,12 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
// Calculate minimum request rate for DoS protection
|
||||
_contentProducer.checkMinDataRate();
|
||||
|
||||
Content content = _contentProducer.nextContent(lock);
|
||||
Content content = _contentProducer.nextContent();
|
||||
if (content == null)
|
||||
throw new IllegalStateException("read on unready input");
|
||||
if (!content.isSpecial())
|
||||
|
@ -299,7 +302,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
*/
|
||||
public boolean hasContent()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
// Do not call _contentProducer.available() as it calls HttpChannel.produceContent()
|
||||
// which is forbidden by this method's contract.
|
||||
|
@ -313,7 +316,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
@Override
|
||||
public int available()
|
||||
{
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
int available = _contentProducer.available();
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -332,7 +335,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
public void run()
|
||||
{
|
||||
Content content;
|
||||
try (AutoLock lock = _contentProducerLock.lock())
|
||||
try (AutoLock lock = _contentProducer.lock())
|
||||
{
|
||||
// Call isReady() to make sure that if not ready we register for fill interest.
|
||||
if (!_contentProducer.isReady())
|
||||
|
@ -341,7 +344,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
LOG.debug("running but not ready {}", this);
|
||||
return;
|
||||
}
|
||||
content = _contentProducer.nextContent(lock);
|
||||
content = _contentProducer.nextContent();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("running on content {} {}", content, this);
|
||||
}
|
||||
|
|
|
@ -72,8 +72,11 @@ public class AsyncContentProducerTest
|
|||
|
||||
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, nullValue());
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -91,8 +94,11 @@ public class AsyncContentProducerTest
|
|||
|
||||
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, Is.is(expectedError));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, Is.is(expectedError));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -113,10 +119,13 @@ public class AsyncContentProducerTest
|
|||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, nullValue());
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -137,10 +146,13 @@ public class AsyncContentProducerTest
|
|||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier);
|
||||
assertThat(error, nullValue());
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier);
|
||||
assertThat(error, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -162,10 +174,13 @@ public class AsyncContentProducerTest
|
|||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier));
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, Is.is(expectedError));
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
|
||||
assertThat(error, Is.is(expectedError));
|
||||
}
|
||||
}
|
||||
|
||||
private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException
|
||||
|
@ -176,7 +191,6 @@ public class AsyncContentProducerTest
|
|||
int isReadyFalseCount = 0;
|
||||
int isReadyTrueCount = 0;
|
||||
Throwable error = null;
|
||||
AutoLock lock = new AutoLock();
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -185,17 +199,13 @@ public class AsyncContentProducerTest
|
|||
else
|
||||
isReadyFalseCount++;
|
||||
|
||||
HttpInput.Content content;
|
||||
try (AutoLock autoLock = lock.lock())
|
||||
HttpInput.Content content = contentProducer.nextContent();
|
||||
nextContentCount++;
|
||||
if (content == null)
|
||||
{
|
||||
content = contentProducer.nextContent(lock);
|
||||
barrier.await(5, TimeUnit.SECONDS);
|
||||
content = contentProducer.nextContent();
|
||||
nextContentCount++;
|
||||
if (content == null)
|
||||
{
|
||||
barrier.await(5, TimeUnit.SECONDS);
|
||||
content = contentProducer.nextContent(lock);
|
||||
nextContentCount++;
|
||||
}
|
||||
}
|
||||
assertThat(content, notNullValue());
|
||||
|
||||
|
|
|
@ -69,8 +69,11 @@ public class BlockingContentProducerTest
|
|||
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
|
||||
ref.set(contentProducer);
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, nullValue());
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -87,10 +90,13 @@ public class BlockingContentProducerTest
|
|||
AtomicReference<ContentProducer> ref = new AtomicReference<>();
|
||||
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible());
|
||||
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
|
||||
ref.set(contentProducer);
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
ref.set(contentProducer);
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, is(expectedError));
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, is(expectedError));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -112,10 +118,13 @@ public class BlockingContentProducerTest
|
|||
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible());
|
||||
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
|
||||
ref.set(contentProducer);
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, nullValue());
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -137,10 +146,13 @@ public class BlockingContentProducerTest
|
|||
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible());
|
||||
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
|
||||
ref.set(contentProducer);
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer);
|
||||
assertThat(error, nullValue());
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer);
|
||||
assertThat(error, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -163,10 +175,13 @@ public class BlockingContentProducerTest
|
|||
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible());
|
||||
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
|
||||
ref.set(contentProducer);
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
try (AutoLock lock = contentProducer.lock())
|
||||
{
|
||||
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
|
||||
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, is(expectedError));
|
||||
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
|
||||
assertThat(error, is(expectedError));
|
||||
}
|
||||
}
|
||||
|
||||
private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, int totalContentCount, ContentProducer contentProducer)
|
||||
|
@ -175,14 +190,9 @@ public class BlockingContentProducerTest
|
|||
int nextContentCount = 0;
|
||||
String consumedString = "";
|
||||
Throwable error = null;
|
||||
AutoLock lock = new AutoLock();
|
||||
while (true)
|
||||
{
|
||||
HttpInput.Content content;
|
||||
try (AutoLock autoLock = lock.lock())
|
||||
{
|
||||
content = contentProducer.nextContent(lock);
|
||||
}
|
||||
HttpInput.Content content = contentProducer.nextContent();
|
||||
nextContentCount++;
|
||||
|
||||
if (content.isSpecial())
|
||||
|
|
Loading…
Reference in New Issue