Merge pull request #5953 from eclipse/jetty-10.0.x-5605-wakeup-blocked-threads

Jetty 10.0.x Fix #5605 Unblock non container Threads
This commit is contained in:
Ludovic Orban 2021-02-17 14:46:56 +01:00 committed by GitHub
commit 4cfe7b9e3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1265 additions and 209 deletions

View File

@ -14,20 +14,23 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Non-blocking {@link ContentProducer} implementation. Calling {@link #nextContent()} will never block * Non-blocking {@link ContentProducer} implementation. Calling {@link ContentProducer#nextContent()} will never block
* but will return null when there is no available content. * but will return null when there is no available content.
*/ */
class AsyncContentProducer implements ContentProducer class AsyncContentProducer implements ContentProducer
{ {
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class); private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private final AutoLock _lock = new AutoLock();
private final HttpChannel _httpChannel; private final HttpChannel _httpChannel;
private HttpInput.Interceptor _interceptor; private HttpInput.Interceptor _interceptor;
private HttpInput.Content _rawContent; private HttpInput.Content _rawContent;
@ -41,9 +44,16 @@ class AsyncContentProducer implements ContentProducer
_httpChannel = httpChannel; _httpChannel = httpChannel;
} }
@Override
public AutoLock lock()
{
return _lock.lock();
}
@Override @Override
public void recycle() public void recycle()
{ {
assertLocked();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("recycling {}", this); LOG.debug("recycling {}", this);
_interceptor = null; _interceptor = null;
@ -57,18 +67,21 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public HttpInput.Interceptor getInterceptor() public HttpInput.Interceptor getInterceptor()
{ {
assertLocked();
return _interceptor; return _interceptor;
} }
@Override @Override
public void setInterceptor(HttpInput.Interceptor interceptor) public void setInterceptor(HttpInput.Interceptor interceptor)
{ {
assertLocked();
this._interceptor = interceptor; this._interceptor = interceptor;
} }
@Override @Override
public int available() public int available()
{ {
assertLocked();
HttpInput.Content content = nextTransformedContent(); HttpInput.Content content = nextTransformedContent();
int available = content == null ? 0 : content.remaining(); int available = content == null ? 0 : content.remaining();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -79,6 +92,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public boolean hasContent() public boolean hasContent()
{ {
assertLocked();
boolean hasContent = _rawContent != null; boolean hasContent = _rawContent != null;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("hasContent = {} {}", hasContent, this); LOG.debug("hasContent = {} {}", hasContent, this);
@ -88,6 +102,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public boolean isError() public boolean isError()
{ {
assertLocked();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("isError = {} {}", _error, this); LOG.debug("isError = {} {}", _error, this);
return _error; return _error;
@ -96,6 +111,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public void checkMinDataRate() public void checkMinDataRate()
{ {
assertLocked();
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate(); long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this); LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this);
@ -127,6 +143,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public long getRawContentArrived() public long getRawContentArrived()
{ {
assertLocked();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this); LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this);
return _rawContentArrived; return _rawContentArrived;
@ -135,6 +152,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public boolean consumeAll(Throwable x) public boolean consumeAll(Throwable x)
{ {
assertLocked();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("consumeAll [e={}] {}", x, this); LOG.debug("consumeAll [e={}] {}", x, this);
failCurrentContent(x); failCurrentContent(x);
@ -177,11 +195,16 @@ class AsyncContentProducer implements ContentProducer
_rawContent.failed(x); _rawContent.failed(x);
_rawContent = null; _rawContent = null;
} }
HttpInput.ErrorContent errorContent = new HttpInput.ErrorContent(x);
_transformedContent = errorContent;
_rawContent = errorContent;
} }
@Override @Override
public boolean onContentProducible() public boolean onContentProducible()
{ {
assertLocked();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onContentProducible {}", this); LOG.debug("onContentProducible {}", this);
return _httpChannel.getState().onReadReady(); return _httpChannel.getState().onReadReady();
@ -190,6 +213,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public HttpInput.Content nextContent() public HttpInput.Content nextContent()
{ {
assertLocked();
HttpInput.Content content = nextTransformedContent(); HttpInput.Content content = nextTransformedContent();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("nextContent = {} {}", content, this); LOG.debug("nextContent = {} {}", content, this);
@ -201,6 +225,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public void reclaim(HttpInput.Content content) public void reclaim(HttpInput.Content content)
{ {
assertLocked();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("reclaim {} {}", content, this); LOG.debug("reclaim {} {}", content, this);
if (_transformedContent == content) if (_transformedContent == content)
@ -215,6 +240,7 @@ class AsyncContentProducer implements ContentProducer
@Override @Override
public boolean isReady() public boolean isReady()
{ {
assertLocked();
HttpInput.Content content = nextTransformedContent(); HttpInput.Content content = nextTransformedContent();
if (content != null) if (content != null)
{ {
@ -274,6 +300,13 @@ class AsyncContentProducer implements ContentProducer
{ {
// TODO does EOF need to be passed to the interceptors? // TODO does EOF need to be passed to the interceptors?
// In case the _rawContent was set by consumeAll(), check the httpChannel
// to see if it has a more precise error. Otherwise, the exact same
// special content will be returned by the httpChannel.
HttpInput.Content refreshedRawContent = produceRawContent();
if (refreshedRawContent != null)
_rawContent = refreshedRawContent;
_error = _rawContent.getError() != null; _error = _rawContent.getError() != null;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this); LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
@ -352,6 +385,12 @@ class AsyncContentProducer implements ContentProducer
return content; return content;
} }
private void assertLocked()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException("ContentProducer must be called within lock scope");
}
@Override @Override
public String toString() public String toString()
{ {
@ -365,4 +404,53 @@ class AsyncContentProducer implements ContentProducer
_httpChannel _httpChannel
); );
} }
LockedSemaphore newLockedSemaphore()
{
return new LockedSemaphore();
}
/**
* A semaphore that assumes working under {@link AsyncContentProducer#lock()} scope.
*/
class LockedSemaphore
{
private final Condition _condition;
private int _permits;
private LockedSemaphore()
{
this._condition = _lock.newCondition();
}
void assertLocked()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException("LockedSemaphore must be called within lock scope");
}
void drainPermits()
{
_permits = 0;
}
void acquire() throws InterruptedException
{
while (_permits == 0)
_condition.await();
_permits--;
}
void release()
{
_permits++;
_condition.signal();
}
@Override
public String toString()
{
return getClass().getSimpleName() + " permits=" + _permits;
}
}
} }

View File

@ -13,25 +13,31 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.util.concurrent.Semaphore; import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when * Blocking implementation of {@link ContentProducer}. Calling {@link ContentProducer#nextContent()} will block when
* there is no available content but will never return null. * there is no available content but will never return null.
*/ */
class BlockingContentProducer implements ContentProducer class BlockingContentProducer implements ContentProducer
{ {
private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class); private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class);
private final Semaphore _semaphore = new Semaphore(0);
private final AsyncContentProducer _asyncContentProducer; private final AsyncContentProducer _asyncContentProducer;
private final AsyncContentProducer.LockedSemaphore _semaphore;
BlockingContentProducer(AsyncContentProducer delegate) BlockingContentProducer(AsyncContentProducer delegate)
{ {
_asyncContentProducer = delegate; _asyncContentProducer = delegate;
_semaphore = _asyncContentProducer.newLockedSemaphore();
}
@Override
public AutoLock lock()
{
return _asyncContentProducer.lock();
} }
@Override @Override
@ -76,7 +82,9 @@ class BlockingContentProducer implements ContentProducer
@Override @Override
public boolean consumeAll(Throwable x) public boolean consumeAll(Throwable x)
{ {
return _asyncContentProducer.consumeAll(x); boolean eof = _asyncContentProducer.consumeAll(x);
_semaphore.release();
return eof;
} }
@Override @Override
@ -142,6 +150,7 @@ class BlockingContentProducer implements ContentProducer
@Override @Override
public boolean onContentProducible() public boolean onContentProducible()
{ {
_semaphore.assertLocked();
// In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state // In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state
// DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why // DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why
// this method always returns false. // this method always returns false.

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import org.eclipse.jetty.util.thread.AutoLock;
/** /**
* ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}. * ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}.
* It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()}, * It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()},
@ -24,6 +26,13 @@ package org.eclipse.jetty.server;
*/ */
public interface ContentProducer public interface ContentProducer
{ {
/**
* Lock this instance. The lock must be held before any method of this instance's
* method be called, and must be manually released afterward.
* @return the lock that is guarding this instance.
*/
AutoLock lock();
/** /**
* Reset all internal state and clear any held resources. * Reset all internal state and clear any held resources.
*/ */

View File

@ -701,9 +701,20 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
} }
if (isCommitted()) if (isCommitted())
{
abort(failure); abort(failure);
}
else else
_state.onError(failure); {
try
{
_state.onError(failure);
}
catch (IllegalStateException e)
{
abort(failure);
}
}
} }
/** /**

View File

@ -309,19 +309,24 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
/** /**
* Parse and fill data, looking for content * Parse and fill data, looking for content.
* We do parse first, and only fill if we're out of bytes to avoid unnecessary system calls.
*/ */
void parseAndFillForContent() void parseAndFillForContent()
{ {
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method // When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown() // doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown().
int filled = Integer.MAX_VALUE;
// This loop was designed by a committee and voted by a majority.
while (_parser.inContentState()) while (_parser.inContentState())
{ {
boolean handled = parseRequestBuffer(); if (parseRequestBuffer())
if (handled || filled <= 0) break;
// Re-check the parser state after parsing to avoid filling,
// otherwise fillRequestBuffer() would acquire a ByteBuffer
// that may be leaked.
if (_parser.inContentState() && fillRequestBuffer() <= 0)
break; break;
filled = fillRequestBuffer();
} }
} }
@ -412,9 +417,21 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override @Override
public void onCompleted() public void onCompleted()
{ {
// Handle connection upgrades. // If we are fill interested, then a read is pending and we must abort
if (upgrade()) if (isFillInterested())
return; {
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
abort(new IllegalStateException());
}
else
{
// Handle connection upgrades.
if (upgrade())
return;
}
// Drive to EOF, EarlyEOF or Error
boolean complete = _input.consumeAll();
// Finish consuming the request // Finish consuming the request
// If we are still expecting // If we are still expecting
@ -424,7 +441,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_parser.close(); _parser.close();
} }
// else abort if we can't consume all // else abort if we can't consume all
else if (_generator.isPersistent() && !_input.consumeAll()) else if (_generator.isPersistent() && !complete)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser); LOG.debug("unconsumed input {} {}", this, _parser);

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
@ -23,6 +24,7 @@ import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Destroyable; import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,10 +40,10 @@ public class HttpInput extends ServletInputStream implements Runnable
private final BlockingContentProducer _blockingContentProducer; private final BlockingContentProducer _blockingContentProducer;
private final AsyncContentProducer _asyncContentProducer; private final AsyncContentProducer _asyncContentProducer;
private final HttpChannelState _channelState; private final HttpChannelState _channelState;
private ContentProducer _contentProducer; private final LongAdder _contentConsumed = new LongAdder();
private boolean _consumedEof; private volatile ContentProducer _contentProducer;
private ReadListener _readListener; private volatile boolean _consumedEof;
private long _contentConsumed; private volatile ReadListener _readListener;
public HttpInput(HttpChannelState state) public HttpInput(HttpChannelState state)
{ {
@ -55,11 +57,20 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this); LOG.debug("recycle {}", this);
_blockingContentProducer.recycle(); }
_contentProducer = _blockingContentProducer;
_consumedEof = false; public void reopen()
_readListener = null; {
_contentConsumed = 0; try (AutoLock lock = _contentProducer.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("reopen {}", this);
_blockingContentProducer.recycle();
_contentProducer = _blockingContentProducer;
_consumedEof = false;
_readListener = null;
_contentConsumed.reset();
}
} }
/** /**
@ -67,7 +78,10 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public Interceptor getInterceptor() public Interceptor getInterceptor()
{ {
return _contentProducer.getInterceptor(); try (AutoLock lock = _contentProducer.lock())
{
return _contentProducer.getInterceptor();
}
} }
/** /**
@ -77,9 +91,12 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public void setInterceptor(Interceptor interceptor) public void setInterceptor(Interceptor interceptor)
{ {
if (LOG.isDebugEnabled()) try (AutoLock lock = _contentProducer.lock())
LOG.debug("setting interceptor to {} on {}", interceptor, this); {
_contentProducer.setInterceptor(interceptor); if (LOG.isDebugEnabled())
LOG.debug("setting interceptor to {} on {}", interceptor, this);
_contentProducer.setInterceptor(interceptor);
}
} }
/** /**
@ -90,60 +107,72 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public void addInterceptor(Interceptor interceptor) public void addInterceptor(Interceptor interceptor)
{ {
Interceptor currentInterceptor = _contentProducer.getInterceptor(); try (AutoLock lock = _contentProducer.lock())
if (currentInterceptor == null)
{ {
if (LOG.isDebugEnabled()) Interceptor currentInterceptor = _contentProducer.getInterceptor();
LOG.debug("adding single interceptor: {} on {}", interceptor, this); if (currentInterceptor == null)
_contentProducer.setInterceptor(interceptor); {
} if (LOG.isDebugEnabled())
else LOG.debug("adding single interceptor: {} on {}", interceptor, this);
{ _contentProducer.setInterceptor(interceptor);
ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); }
if (LOG.isDebugEnabled()) else
LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this); {
_contentProducer.setInterceptor(chainedInterceptor); ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor);
if (LOG.isDebugEnabled())
LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this);
_contentProducer.setInterceptor(chainedInterceptor);
}
} }
} }
public int get(Content content, byte[] bytes, int offset, int length) private int get(Content content, byte[] bytes, int offset, int length)
{ {
int consumed = content.get(bytes, offset, length); int consumed = content.get(bytes, offset, length);
_contentConsumed += consumed; _contentConsumed.add(consumed);
return consumed; return consumed;
} }
public long getContentConsumed() public long getContentConsumed()
{ {
return _contentConsumed; return _contentConsumed.sum();
} }
public long getContentReceived() public long getContentReceived()
{ {
return _contentProducer.getRawContentArrived(); try (AutoLock lock = _contentProducer.lock())
{
return _contentProducer.getRawContentArrived();
}
} }
public boolean consumeAll() public boolean consumeAll()
{ {
IOException failure = new IOException("Unconsumed content"); try (AutoLock lock = _contentProducer.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("consumeAll {}", this, failure); IOException failure = new IOException("Unconsumed content");
boolean atEof = _contentProducer.consumeAll(failure); if (LOG.isDebugEnabled())
if (atEof) LOG.debug("consumeAll {}", this, failure);
_consumedEof = true; boolean atEof = _contentProducer.consumeAll(failure);
if (atEof)
_consumedEof = true;
if (isFinished()) if (isFinished())
return !isError(); return !isError();
return false; return false;
}
} }
public boolean isError() public boolean isError()
{ {
boolean error = _contentProducer.isError(); try (AutoLock lock = _contentProducer.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("isError={} {}", error, this); boolean error = _contentProducer.isError();
return error; if (LOG.isDebugEnabled())
LOG.debug("isError={} {}", error, this);
return error;
}
} }
public boolean isAsync() public boolean isAsync()
@ -167,10 +196,13 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public boolean isReady() public boolean isReady()
{ {
boolean ready = _contentProducer.isReady(); try (AutoLock lock = _contentProducer.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("isReady={} {}", ready, this); boolean ready = _contentProducer.isReady();
return ready; if (LOG.isDebugEnabled())
LOG.debug("isReady={} {}", ready, this);
return ready;
}
} }
@Override @Override
@ -180,10 +212,10 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug("setting read listener to {} {}", readListener, this); LOG.debug("setting read listener to {} {}", readListener, this);
if (_readListener != null) if (_readListener != null)
throw new IllegalStateException("ReadListener already set"); throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
//illegal if async not started //illegal if async not started
if (!_channelState.isAsyncStarted()) if (!_channelState.isAsyncStarted())
throw new IllegalStateException("Async not started"); throw new IllegalStateException("Async not started");
_readListener = Objects.requireNonNull(readListener);
_contentProducer = _asyncContentProducer; _contentProducer = _asyncContentProducer;
// trigger content production // trigger content production
@ -193,59 +225,68 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean onContentProducible() public boolean onContentProducible()
{ {
return _contentProducer.onContentProducible(); try (AutoLock lock = _contentProducer.lock())
{
return _contentProducer.onContentProducible();
}
} }
@Override @Override
public int read() throws IOException public int read() throws IOException
{ {
int read = read(_oneByteBuffer, 0, 1); try (AutoLock lock = _contentProducer.lock())
if (read == 0) {
throw new IOException("unready read=0"); int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException public int read(byte[] b, int off, int len) throws IOException
{ {
// Calculate minimum request rate for DoS protection try (AutoLock lock = _contentProducer.lock())
_contentProducer.checkMinDataRate();
Content content = _contentProducer.nextContent();
if (content == null)
throw new IllegalStateException("read on unready input");
if (!content.isSpecial())
{ {
int read = get(content, b, off, len); // Calculate minimum request rate for DoS protection
_contentProducer.checkMinDataRate();
Content content = _contentProducer.nextContent();
if (content == null)
throw new IllegalStateException("read on unready input");
if (!content.isSpecial())
{
int read = get(content, b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s) {}", read, this);
if (content.isEmpty())
_contentProducer.reclaim(content);
return read;
}
Throwable error = content.getError();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s) {}", read, this); LOG.debug("read error={} {}", error, this);
if (content.isEmpty()) if (error != null)
_contentProducer.reclaim(content); {
return read; if (error instanceof IOException)
} throw (IOException)error;
throw new IOException(error);
}
Throwable error = content.getError(); if (content.isEof())
if (LOG.isDebugEnabled()) {
LOG.debug("read error={} {}", error, this); if (LOG.isDebugEnabled())
if (error != null) LOG.debug("read at EOF, setting consumed EOF to true {}", this);
{ _consumedEof = true;
if (error instanceof IOException) // If EOF do we need to wake for allDataRead callback?
throw (IOException)error; if (onContentProducible())
throw new IOException(error); scheduleReadListenerNotification();
} return -1;
}
if (content.isEof()) throw new AssertionError("no data, no error and not EOF");
{
if (LOG.isDebugEnabled())
LOG.debug("read at EOF, setting consumed EOF to true {}", this);
_consumedEof = true;
// If EOF do we need to wake for allDataRead callback?
if (onContentProducible())
scheduleReadListenerNotification();
return -1;
} }
throw new AssertionError("no data, no error and not EOF");
} }
private void scheduleReadListenerNotification() private void scheduleReadListenerNotification()
@ -261,21 +302,27 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public boolean hasContent() public boolean hasContent()
{ {
// Do not call _contentProducer.available() as it calls HttpChannel.produceContent() try (AutoLock lock = _contentProducer.lock())
// which is forbidden by this method's contract. {
boolean hasContent = _contentProducer.hasContent(); // Do not call _contentProducer.available() as it calls HttpChannel.produceContent()
if (LOG.isDebugEnabled()) // which is forbidden by this method's contract.
LOG.debug("hasContent={} {}", hasContent, this); boolean hasContent = _contentProducer.hasContent();
return hasContent; if (LOG.isDebugEnabled())
LOG.debug("hasContent={} {}", hasContent, this);
return hasContent;
}
} }
@Override @Override
public int available() public int available()
{ {
int available = _contentProducer.available(); try (AutoLock lock = _contentProducer.lock())
if (LOG.isDebugEnabled()) {
LOG.debug("available={} {}", available, this); int available = _contentProducer.available();
return available; if (LOG.isDebugEnabled())
LOG.debug("available={} {}", available, this);
return available;
}
} }
/* Runnable */ /* Runnable */
@ -287,16 +334,20 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public void run() public void run()
{ {
// Call isReady() to make sure that if not ready we register for fill interest. Content content;
if (!_contentProducer.isReady()) try (AutoLock lock = _contentProducer.lock())
{ {
// Call isReady() to make sure that if not ready we register for fill interest.
if (!_contentProducer.isReady())
{
if (LOG.isDebugEnabled())
LOG.debug("running but not ready {}", this);
return;
}
content = _contentProducer.nextContent();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("running but not ready {}", this); LOG.debug("running on content {} {}", content, this);
return;
} }
Content content = _contentProducer.nextContent();
if (LOG.isDebugEnabled())
LOG.debug("running on content {} {}", content, this);
// This check is needed when a request is started async but no read listener is registered. // This check is needed when a request is started async but no read listener is registered.
if (_readListener == null) if (_readListener == null)

View File

@ -23,6 +23,8 @@ import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder; import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult; import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
@ -61,7 +63,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
enum State enum State
{ {
OPEN, // Open OPEN, // Open
CLOSE, // Close needed from onWriteCompletion CLOSE, // Close needed from onWriteComplete
CLOSING, // Close in progress after close API called CLOSING, // Close in progress after close API called
CLOSED // Closed CLOSED // Closed
} }
@ -292,7 +294,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
// Somebody called close or complete while we were writing. // Somebody called close or complete while we were writing.
// We can now send a (probably empty) last buffer and then when it completes // We can now send a (probably empty) last buffer and then when it completes
// onWriteCompletion will be called again to actually execute the _completeCallback // onWriteComplete will be called again to actually execute the _completeCallback
_state = State.CLOSING; _state = State.CLOSING;
closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
} }
@ -395,53 +397,87 @@ public class HttpOutput extends ServletOutputStream implements Runnable
ByteBuffer content = null; ByteBuffer content = null;
try (AutoLock l = _channelState.lock()) try (AutoLock l = _channelState.lock())
{ {
switch (_state) // First check the API state for any unrecoverable situations
switch (_apiState)
{ {
case CLOSED: case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time
succeeded = true; error = new CancellationException("Completed whilst write unready");
break; break;
case CLOSE: case PENDING: // an async write is pending and may complete at any time
case CLOSING: // If this is not the last write, then we must abort
_closedCallback = Callback.combine(_closedCallback, callback); if (!_channel.getResponse().isContentComplete(_written))
error = new CancellationException("Completed whilst write pending");
break; break;
case OPEN: case BLOCKED: // another thread is blocked in a write or a close
if (_onError != null) error = new CancellationException("Completed whilst write blocked");
{ break;
error = _onError;
default:
break;
}
// If we can't complete due to the API state, then abort
if (error != null)
{
_channel.abort(error);
_writeBlocker.fail(error);
_state = State.CLOSED;
}
else
{
// Otherwise check the output state to determine how to complete
switch (_state)
{
case CLOSED:
succeeded = true;
break; break;
}
_closedCallback = Callback.combine(_closedCallback, callback); case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
break;
switch (_apiState) case OPEN:
{ if (_onError != null)
case BLOCKING: {
// Output is idle blocking state, but we still do an async close error = _onError;
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break; break;
}
case ASYNC: _closedCallback = Callback.combine(_closedCallback, callback);
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
case BLOCKED: switch (_apiState)
case UNREADY: {
case PENDING: case BLOCKING:
// An operation is in progress, so we soft close now // Output is idle blocking state, but we still do an async close
_softClose = true; _apiState = ApiState.BLOCKED;
// then trigger a close from onWriteComplete _state = State.CLOSING;
_state = State.CLOSE; content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break; break;
}
break; case ASYNC:
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
case UNREADY:
case PENDING:
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
break;
default:
throw new IllegalStateException();
}
break;
}
} }
} }
@ -1351,7 +1387,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_state = State.OPEN; _state = State.OPEN;
_apiState = ApiState.BLOCKING; _apiState = ApiState.BLOCKING;
_softClose = false; _softClose = true; // Stay closed until next request
_interceptor = _channel; _interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration(); HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize(); _bufferSize = config.getOutputBufferSize();

View File

@ -1681,6 +1681,11 @@ public class Request implements HttpServletRequest
*/ */
public void setMetaData(MetaData.Request request) public void setMetaData(MetaData.Request request)
{ {
if (_metaData == null && _input != null && _channel != null)
{
_input.reopen();
_channel.getResponse().getHttpOutput().reopen();
}
_metaData = request; _metaData = request;
_method = request.getMethod(); _method = request.getMethod();
_httpFields = request.getFields(); _httpFields = request.getFields();

View File

@ -273,8 +273,11 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
@Override @Override
protected void onCompleteFailure(Throwable x) protected void onCompleteFailure(Throwable x)
{ {
_deflaterEntry.release(); if (_deflaterEntry != null)
_deflaterEntry = null; {
_deflaterEntry.release();
_deflaterEntry = null;
}
super.onCompleteFailure(x); super.onCompleteFailure(x);
} }

View File

@ -28,8 +28,8 @@ import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.util.compression.CompressionPool;
import org.eclipse.jetty.util.compression.InflaterPool; import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.thread.AutoLock;
import org.hamcrest.core.Is; import org.hamcrest.core.Is;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -72,8 +72,11 @@ public class AsyncContentProducerTest
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); try (AutoLock lock = contentProducer.lock())
assertThat(error, nullValue()); {
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, nullValue());
}
} }
@Test @Test
@ -91,8 +94,11 @@ public class AsyncContentProducerTest
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); try (AutoLock lock = contentProducer.lock())
assertThat(error, Is.is(expectedError)); {
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, Is.is(expectedError));
}
} }
@Test @Test
@ -113,10 +119,13 @@ public class AsyncContentProducerTest
CyclicBarrier barrier = new CyclicBarrier(2); CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, nullValue()); assertThat(error, nullValue());
}
} }
@Test @Test
@ -137,10 +146,13 @@ public class AsyncContentProducerTest
CyclicBarrier barrier = new CyclicBarrier(2); CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier); Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier);
assertThat(error, nullValue()); assertThat(error, nullValue());
}
} }
@Test @Test
@ -162,10 +174,13 @@ public class AsyncContentProducerTest
CyclicBarrier barrier = new CyclicBarrier(2); CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier));
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, Is.is(expectedError)); assertThat(error, Is.is(expectedError));
}
} }
private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException

View File

@ -20,13 +20,13 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.util.compression.InflaterPool; import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.thread.AutoLock;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -63,13 +63,16 @@ public class BlockingContentProducerTest
final int totalContentBytesCount = countRemaining(buffers); final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers); final String originalContentString = asString(buffers);
AtomicReference<ContentProducer> ref = new AtomicReference<>(); ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer); contentListener.setContentProducer(contentProducer);
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); try (AutoLock lock = contentProducer.lock())
assertThat(error, nullValue()); {
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, nullValue());
}
} }
@Test @Test
@ -83,13 +86,16 @@ public class BlockingContentProducerTest
final String originalContentString = asString(buffers); final String originalContentString = asString(buffers);
final Throwable expectedError = new EofException("Early EOF"); final Throwable expectedError = new EofException("Early EOF");
AtomicReference<ContentProducer> ref = new AtomicReference<>(); ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible()); ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer); contentListener.setContentProducer(contentProducer);
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); try (AutoLock lock = contentProducer.lock())
assertThat(error, is(expectedError)); {
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, is(expectedError));
}
} }
@Test @Test
@ -107,14 +113,18 @@ public class BlockingContentProducerTest
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
AtomicReference<ContentProducer> ref = new AtomicReference<>(); ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer); contentListener.setContentProducer(contentProducer);
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); try (AutoLock lock = contentProducer.lock())
assertThat(error, nullValue()); {
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, nullValue());
}
} }
@Test @Test
@ -132,14 +142,18 @@ public class BlockingContentProducerTest
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
AtomicReference<ContentProducer> ref = new AtomicReference<>(); ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer); contentListener.setContentProducer(contentProducer);
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer); try (AutoLock lock = contentProducer.lock())
assertThat(error, nullValue()); {
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer);
assertThat(error, nullValue());
}
} }
@Test @Test
@ -158,14 +172,18 @@ public class BlockingContentProducerTest
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
AtomicReference<ContentProducer> ref = new AtomicReference<>(); ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible()); ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer); contentListener.setContentProducer(contentProducer);
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); try (AutoLock lock = contentProducer.lock())
assertThat(error, is(expectedError)); {
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, is(expectedError));
}
} }
private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, int totalContentCount, ContentProducer contentProducer) private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, int totalContentCount, ContentProducer contentProducer)
@ -241,9 +259,26 @@ public class BlockingContentProducerTest
} }
} }
private interface ContentListener private static class ContentListener
{ {
void onContent(); private ContentProducer contentProducer;
private ContentListener()
{
}
private void onContent()
{
try (AutoLock lock = contentProducer.lock())
{
contentProducer.onContentProducible();
}
}
private void setContentProducer(ContentProducer contentProducer)
{
this.contentProducer = contentProducer;
}
} }
private static class ArrayDelayedHttpChannel extends HttpChannel private static class ArrayDelayedHttpChannel extends HttpChannel

View File

@ -0,0 +1,604 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BlockingTest
{
private Server server;
ServerConnector connector;
private ContextHandler context;
@BeforeEach
void setUp()
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
context = new ContextHandler("/ctx");
HandlerList handlers = new HandlerList();
handlers.setHandlers(new Handler[]{context, new DefaultHandler()});
server.setHandler(handlers);
}
@AfterEach
void tearDown() throws Exception
{
server.stop();
}
@Test
public void testBlockingReadThenNormalComplete() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("OK"));
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testBlockingReadAndBlockingWriteGzipped() throws Exception
{
AtomicReference<Thread> threadRef = new AtomicReference<>();
CyclicBarrier barrier = new CyclicBarrier(2);
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
final AsyncContext asyncContext = baseRequest.startAsync();
final ServletOutputStream outputStream = response.getOutputStream();
final Thread thread = new Thread(() ->
{
try
{
for (int i = 0; i < 5; i++)
{
int b = baseRequest.getHttpInput().read();
assertThat(b, not(is(-1)));
}
outputStream.write("All read.".getBytes(StandardCharsets.UTF_8));
barrier.await(); // notify that all bytes were read
baseRequest.getHttpInput().read(); // this read should throw IOException as the client has closed the connection
throw new AssertionError("should have thrown IOException");
}
catch (Exception e)
{
//throw new RuntimeException(e);
}
finally
{
try
{
outputStream.close();
}
catch (Exception e2)
{
//e2.printStackTrace();
}
asyncContext.complete();
}
});
threadRef.set(thread);
thread.start();
barrier.await(); // notify that handler thread has started
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
};
GzipHandler gzipHandler = new GzipHandler();
gzipHandler.setMinGzipSize(1);
gzipHandler.setHandler(handler);
context.setHandler(gzipHandler);
// using the GzipHandler is mandatory to reproduce the
// context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
// partial chunked request
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Accept-Encoding: gzip, *\r\n")
.append("Content-Type: test/data\r\n")
.append("Transfer-Encoding: chunked\r\n")
.append("\r\n")
.append("10\r\n")
.append("01234")
;
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoLinger(true, 0); // send TCP RST upon close instead of FIN
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
barrier.await(); // wait for handler thread to be started
barrier.await(); // wait for all bytes of the request to be read
}
threadRef.get().join(5000);
assertThat("handler thread should not be alive anymore", threadRef.get().isAlive(), is(false));
}
@Test
public void testNormalCompleteThenBlockingRead() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), containsString("OK"));
completed.countDown();
Thread.sleep(1000);
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testStartAsyncThenBlockingReadThenTimeout() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
AsyncContext async = request.startAsync();
async.setTimeout(100);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
}
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(500));
assertThat(response.getContent(), containsString("AsyncContext timeout"));
completed.countDown();
Thread.sleep(1000);
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testBlockingReadThenSendError() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.sendError(499);
}
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("POST /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: test/data\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertThat(response, notNullValue());
assertThat(response.getStatus(), is(499));
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
}
}
@Test
public void testBlockingWriteThenNormalComplete() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentType("text/plain");
new Thread(() ->
{
try
{
byte[] data = new byte[16 * 1024];
Arrays.fill(data, (byte)'X');
data[data.length - 2] = '\r';
data[data.length - 1] = '\n';
OutputStream out = response.getOutputStream();
started.countDown();
while (true)
out.write(data);
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on write
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("GET /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("\r\n");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
// Read the header
List<String> header = new ArrayList<>();
while (true)
{
String line = in.readLine();
if (line.length() == 0)
break;
header.add(line);
}
assertThat(header.get(0), containsString("200 OK"));
// read one line of content
String content = in.readLine();
assertThat(content, is("4000"));
content = in.readLine();
assertThat(content, startsWith("XXXXXXXX"));
// check that writing thread is stopped by end of request handling
assertTrue(stopped.await(10, TimeUnit.SECONDS));
// read until last line
String last = null;
while (true)
{
String line = in.readLine();
if (line == null)
break;
last = line;
}
// last line is not empty chunk, ie abnormal completion
assertThat(last, startsWith("XXXXX"));
}
}
}

View File

@ -643,6 +643,7 @@ public class ResponseTest
assertEquals("foo2/bar2;charset=utf-8", response.getContentType()); assertEquals("foo2/bar2;charset=utf-8", response.getContentType());
response.recycle(); response.recycle();
response.reopen();
response.setCharacterEncoding("utf16"); response.setCharacterEncoding("utf16");
response.setContentType("text/html; charset=utf-8"); response.setContentType("text/html; charset=utf-8");
@ -655,6 +656,7 @@ public class ResponseTest
assertEquals("text/xml;charset=utf-8", response.getContentType()); assertEquals("text/xml;charset=utf-8", response.getContentType());
response.recycle(); response.recycle();
response.reopen();
response.setCharacterEncoding("utf-16"); response.setCharacterEncoding("utf-16");
response.setContentType("foo/bar"); response.setContentType("foo/bar");
assertEquals("foo/bar;charset=utf-16", response.getContentType()); assertEquals("foo/bar;charset=utf-16", response.getContentType());

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.util;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Objects;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@ -44,10 +45,10 @@ public class SharedBlockingCallback
{ {
private static final Logger LOG = LoggerFactory.getLogger(SharedBlockingCallback.class); private static final Logger LOG = LoggerFactory.getLogger(SharedBlockingCallback.class);
private static Throwable IDLE = new ConstantThrowable("IDLE"); private static final Throwable IDLE = new ConstantThrowable("IDLE");
private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED");
private static Throwable FAILED = new ConstantThrowable("FAILED"); private static final Throwable FAILED = new ConstantThrowable("FAILED");
private final ReentrantLock _lock = new ReentrantLock(); private final ReentrantLock _lock = new ReentrantLock();
private final Condition _idle = _lock.newCondition(); private final Condition _idle = _lock.newCondition();
@ -76,6 +77,26 @@ public class SharedBlockingCallback
} }
} }
public boolean fail(Throwable cause)
{
Objects.requireNonNull(cause);
_lock.lock();
try
{
if (_blocker._state == null)
{
_blocker._state = new BlockerFailedException(cause);
_complete.signalAll();
return true;
}
}
finally
{
_lock.unlock();
}
return false;
}
protected void notComplete(Blocker blocker) protected void notComplete(Blocker blocker)
{ {
LOG.warn("Blocker not complete {}", blocker); LOG.warn("Blocker not complete {}", blocker);
@ -145,10 +166,12 @@ public class SharedBlockingCallback
_state = cause; _state = cause;
_complete.signalAll(); _complete.signalAll();
} }
else if (_state instanceof BlockerTimeoutException) else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException)
{ {
// Failure arrived late, block() already // Failure arrived late, block() already
// modified the state, nothing more to do. // modified the state, nothing more to do.
if (LOG.isDebugEnabled())
LOG.debug("Failed after {}", _state);
} }
else else
{ {
@ -261,4 +284,12 @@ public class SharedBlockingCallback
private static class BlockerTimeoutException extends TimeoutException private static class BlockerTimeoutException extends TimeoutException
{ {
} }
private static class BlockerFailedException extends Exception
{
public BlockerFailedException(Throwable cause)
{
super(cause);
}
}
} }

View File

@ -0,0 +1,140 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BlockedIOTest extends AbstractTest<TransportScenario>
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testBlockingReadThenNormalComplete(Transport transport) throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AtomicReference<Throwable> rereadException = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable ex1)
{
readException.set(ex1);
try
{
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
catch (Throwable ex2)
{
rereadException.set(ex2);
}
finally
{
stopped.countDown();
}
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch ok = new CountDownLatch(2);
scenario.client.newRequest(scenario.newURI())
.method("POST")
.content(contentProvider)
.onResponseContent((response, content) ->
{
assertThat(BufferUtil.toString(content), containsString("OK"));
ok.countDown();
})
.onResponseSuccess(response ->
{
try
{
assertThat(response.getStatus(), is(200));
stopped.await(10, TimeUnit.SECONDS);
ok.countDown();
}
catch (Throwable t)
{
t.printStackTrace();
}
})
.send(null);
contentProvider.offer(BufferUtil.toBuffer("1"));
assertTrue(ok.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
assertThat(rereadException.get(), instanceOf(IOException.class));
}
}