Jetty9 - Added calls to notify() also when calling shutdown() and earlyEOF(), to wake up threads waiting in blockForContent().

Also improved the blockForContent() loop condition, to exit when a shutdown or earlyEOF happens.
This commit is contained in:
Simone Bordet 2012-08-28 09:31:30 +02:00
parent 0acf2f9650
commit 34ee7101cc
1 changed files with 24 additions and 8 deletions

View File

@ -25,6 +25,8 @@ import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
@ -38,6 +40,7 @@ import org.eclipse.jetty.util.ArrayQueue;
*/
public abstract class HttpInput<T> extends ServletInputStream
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
private boolean _earlyEOF;
private boolean _inputEOF;
@ -89,9 +92,9 @@ public abstract class HttpInput<T> extends ServletInputStream
@Override
public int read(byte[] b, int off, int len) throws IOException
{
T item = null;
synchronized (lock())
{
T item = null;
while (item == null)
{
item = _inputQ.peekUnsafe();
@ -99,6 +102,7 @@ public abstract class HttpInput<T> extends ServletInputStream
{
_inputQ.pollUnsafe();
onContentConsumed(item);
LOG.debug("{} consumed {}", this, item);
item = _inputQ.peekUnsafe();
}
@ -106,11 +110,11 @@ public abstract class HttpInput<T> extends ServletInputStream
{
onAllContentConsumed();
if (_earlyEOF)
if (isEarlyEOF())
throw new EofException();
// check for EOF
if (_inputEOF)
if (isShutdown())
{
onEOF();
return -1;
@ -119,9 +123,8 @@ public abstract class HttpInput<T> extends ServletInputStream
blockForContent();
}
}
return get(item, b, off, len);
}
return get(item, b, off, len);
}
protected abstract int remaining(T item);
@ -134,10 +137,11 @@ public abstract class HttpInput<T> extends ServletInputStream
{
synchronized (lock())
{
while (_inputQ.isEmpty())
while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
{
try
{
LOG.debug("{} waiting for content", this);
lock().wait();
}
catch (InterruptedException e)
@ -170,6 +174,7 @@ public abstract class HttpInput<T> extends ServletInputStream
// caller that the buffers can be recycled.
_inputQ.add(item);
onContentQueued(item);
LOG.debug("{} queued {}", this, item);
}
return true;
}
@ -179,6 +184,16 @@ public abstract class HttpInput<T> extends ServletInputStream
synchronized (lock())
{
_earlyEOF = true;
lock().notify();
LOG.debug("{} early EOF", this);
}
}
public boolean isEarlyEOF()
{
synchronized (lock())
{
return _earlyEOF;
}
}
@ -187,6 +202,8 @@ public abstract class HttpInput<T> extends ServletInputStream
synchronized (lock())
{
_inputEOF = true;
lock().notify();
LOG.debug("{} shutdown", this);
}
}
@ -202,7 +219,7 @@ public abstract class HttpInput<T> extends ServletInputStream
{
synchronized (lock())
{
while (!_inputEOF && !_earlyEOF)
while (!isShutdown() && !isEarlyEOF())
{
T item = _inputQ.peekUnsafe();
while (item != null)
@ -221,7 +238,6 @@ public abstract class HttpInput<T> extends ServletInputStream
}
catch (IOException e)
{
e.printStackTrace();
throw new RuntimeIOException(e);
}
}