Fixed AsyncIO double dispatch

This commit is contained in:
Greg Wilkins 2015-02-19 18:13:31 +11:00
parent bb8b5f9b76
commit 968063c1ab
6 changed files with 14 additions and 6 deletions

View File

@ -33,6 +33,6 @@ public class HttpInputOverHTTP2 extends HttpInput
@Override @Override
protected void onReadPossible() protected void onReadPossible()
{ {
_httpChannelState.onReadPossible(); _httpChannelState.onReadPossible(true);
} }
} }

View File

@ -60,6 +60,8 @@ public abstract class FillInterest
} }
try try
{ {
if (LOG.isDebugEnabled())
LOG.debug("{} register {}",this,callback);
needsFillInterest(); needsFillInterest();
} }
catch (Throwable e) catch (Throwable e)
@ -78,7 +80,7 @@ public abstract class FillInterest
LOG.debug("{} fillable {}",this,callback); LOG.debug("{} fillable {}",this,callback);
if (callback != null && _interested.compareAndSet(callback, null)) if (callback != null && _interested.compareAndSet(callback, null))
callback.succeeded(); callback.succeeded();
else else if (LOG.isDebugEnabled())
LOG.debug("{} lost race {}",this,callback); LOG.debug("{} lost race {}",this,callback);
} }

View File

@ -107,7 +107,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override @Override
protected void onReadPossible() protected void onReadPossible()
{ {
getState().onReadPossible(); getState().onReadPossible(true);
} }
}; };
} }

View File

@ -716,7 +716,7 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute); _channel.getRequest().setAttribute(name,attribute);
} }
public void onReadPossible() public void onReadPossible(boolean execute)
{ {
boolean handle=false; boolean handle=false;
@ -730,7 +730,7 @@ public class HttpChannelState
} }
} }
if (handle) if (execute && handle)
// TODO, do we need to execute or just run? // TODO, do we need to execute or just run?
_channel.execute(_channel); _channel.execute(_channel);
} }

View File

@ -51,6 +51,6 @@ public class HttpInputOverHTTP extends HttpInput
@Override @Override
protected void onReadPossible() protected void onReadPossible()
{ {
_httpConnection.getHttpChannel().getState().onReadPossible(); _httpConnection.getHttpChannel().getState().onReadPossible(false); // Will be handled by async callback
} }
} }

View File

@ -356,6 +356,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override @Override
public void execute(Runnable job) public void execute(Runnable job)
{ {
if (LOG.isDebugEnabled())
LOG.debug("queue {}",job);
if (!isRunning() || !_jobs.offer(job)) if (!isRunning() || !_jobs.offer(job))
{ {
LOG.warn("{} rejected {}", this, job); LOG.warn("{} rejected {}", this, job);
@ -552,7 +554,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
// Job loop // Job loop
while (job != null && isRunning()) while (job != null && isRunning())
{ {
if (LOG.isDebugEnabled())
LOG.debug("run {}",job);
runJob(job); runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {}",job);
if (Thread.interrupted()) if (Thread.interrupted())
{ {
ignore=true; ignore=true;