Issue #1721 async read failure on big POST
Modified isReady to not fillAndParseContent if known to be isReady()==false already. Added mutex on produceContent
This commit is contained in:
parent
a58447aea1
commit
3c3d05f722
|
@ -39,7 +39,6 @@ public abstract class FillInterest
|
||||||
{
|
{
|
||||||
private final static Logger LOG = Log.getLogger(FillInterest.class);
|
private final static Logger LOG = Log.getLogger(FillInterest.class);
|
||||||
private final AtomicReference<Callback> _interested = new AtomicReference<>(null);
|
private final AtomicReference<Callback> _interested = new AtomicReference<>(null);
|
||||||
private Throwable _lastSet;
|
|
||||||
|
|
||||||
protected FillInterest()
|
protected FillInterest()
|
||||||
{
|
{
|
||||||
|
@ -58,8 +57,6 @@ public abstract class FillInterest
|
||||||
if (!tryRegister(callback))
|
if (!tryRegister(callback))
|
||||||
{
|
{
|
||||||
LOG.warn("Read pending for {} prevented {}", _interested, callback);
|
LOG.warn("Read pending for {} prevented {}", _interested, callback);
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.warn("callback set at ",_lastSet);
|
|
||||||
throw new ReadPendingException();
|
throw new ReadPendingException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,10 +78,7 @@ public abstract class FillInterest
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
{
|
LOG.debug("interested {}",this);
|
||||||
LOG.debug("{} register {}",this,callback);
|
|
||||||
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -103,9 +97,9 @@ public abstract class FillInterest
|
||||||
*/
|
*/
|
||||||
public void fillable()
|
public void fillable()
|
||||||
{
|
{
|
||||||
Callback callback = _interested.get();
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} fillable {}",this,callback);
|
LOG.debug("fillable {}",this);
|
||||||
|
Callback callback = _interested.get();
|
||||||
if (callback != null && _interested.compareAndSet(callback, null))
|
if (callback != null && _interested.compareAndSet(callback, null))
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
else if (LOG.isDebugEnabled())
|
else if (LOG.isDebugEnabled())
|
||||||
|
@ -134,6 +128,8 @@ public abstract class FillInterest
|
||||||
*/
|
*/
|
||||||
public boolean onFail(Throwable cause)
|
public boolean onFail(Throwable cause)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("onFail {} {}",this,cause);
|
||||||
Callback callback = _interested.get();
|
Callback callback = _interested.get();
|
||||||
if (callback != null && _interested.compareAndSet(callback, null))
|
if (callback != null && _interested.compareAndSet(callback, null))
|
||||||
{
|
{
|
||||||
|
@ -145,9 +141,9 @@ public abstract class FillInterest
|
||||||
|
|
||||||
public void onClose()
|
public void onClose()
|
||||||
{
|
{
|
||||||
Callback callback = _interested.get();
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} onClose {}",this,callback);
|
LOG.debug("onClose {}",this);
|
||||||
|
Callback callback = _interested.get();
|
||||||
if (callback != null && _interested.compareAndSet(callback, null))
|
if (callback != null && _interested.compareAndSet(callback, null))
|
||||||
callback.failed(new ClosedChannelException());
|
callback.failed(new ClosedChannelException());
|
||||||
}
|
}
|
||||||
|
@ -155,7 +151,7 @@ public abstract class FillInterest
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get()!=null, _interested.get());
|
return String.format("FillInterest@%x{%s}", hashCode(), _interested.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -404,7 +404,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
||||||
|
|
||||||
case READ_PRODUCE:
|
case READ_PRODUCE:
|
||||||
{
|
{
|
||||||
_request.getHttpInput().produceContent();
|
_request.getHttpInput().asyncReadProduce();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -330,6 +330,18 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
protected void produceContent() throws IOException
|
protected void produceContent() throws IOException
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by channel when asynchronous IO needs to produce more content
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void asyncReadProduce() throws IOException
|
||||||
|
{
|
||||||
|
synchronized (_inputQ)
|
||||||
|
{
|
||||||
|
produceContent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed.
|
* Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed.
|
||||||
|
@ -707,6 +719,8 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
return true;
|
return true;
|
||||||
if (_state instanceof EOFState)
|
if (_state instanceof EOFState)
|
||||||
return true;
|
return true;
|
||||||
|
if (_waitingForContent)
|
||||||
|
return false;
|
||||||
if (produceNextContext() != null)
|
if (produceNextContext() != null)
|
||||||
return true;
|
return true;
|
||||||
_channelState.onReadUnready();
|
_channelState.onReadUnready();
|
||||||
|
|
Loading…
Reference in New Issue