Revert "Revert "Fixed AsyncIO double dispatch""
This reverts commit 8bd94ec6be
.
This is a revert of the revert to add back in the useful debug, plus some TODO comments describing the problems
This commit is contained in:
parent
8bd94ec6be
commit
b60ea47ef4
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.http2.server;
|
||||||
import org.eclipse.jetty.server.HttpChannelState;
|
import org.eclipse.jetty.server.HttpChannelState;
|
||||||
import org.eclipse.jetty.server.HttpInput;
|
import org.eclipse.jetty.server.HttpInput;
|
||||||
|
|
||||||
|
|
||||||
|
// TODO This class is the same as the default. Is it needed?
|
||||||
public class HttpInputOverHTTP2 extends HttpInput
|
public class HttpInputOverHTTP2 extends HttpInput
|
||||||
{
|
{
|
||||||
private final HttpChannelState _httpChannelState;
|
private final HttpChannelState _httpChannelState;
|
||||||
|
|
|
@ -60,6 +60,8 @@ public abstract class FillInterest
|
||||||
}
|
}
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} register {}",this,callback);
|
||||||
needsFillInterest();
|
needsFillInterest();
|
||||||
}
|
}
|
||||||
catch (Throwable e)
|
catch (Throwable e)
|
||||||
|
@ -78,7 +80,7 @@ public abstract class FillInterest
|
||||||
LOG.debug("{} fillable {}",this,callback);
|
LOG.debug("{} fillable {}",this,callback);
|
||||||
if (callback != null && _interested.compareAndSet(callback, null))
|
if (callback != null && _interested.compareAndSet(callback, null))
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
else
|
else if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} lost race {}",this,callback);
|
LOG.debug("{} lost race {}",this,callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -102,11 +102,16 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
||||||
|
|
||||||
protected HttpInput newHttpInput()
|
protected HttpInput newHttpInput()
|
||||||
{
|
{
|
||||||
|
// TODO - this is a convoluted way to construct the HttpInput. Can this be simplified?
|
||||||
|
// TODO - the issue is that the HttpInput needs access to the HttpState, which is not
|
||||||
|
// TODO - constructed until the HttpChannel is constructed... so there is chicken and egg.
|
||||||
return new HttpInput()
|
return new HttpInput()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void onReadPossible()
|
protected void onReadPossible()
|
||||||
{
|
{
|
||||||
|
// TODO All three implementations just do this? do we need to override onReadPossible at all?
|
||||||
|
|
||||||
getState().onReadPossible();
|
getState().onReadPossible();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -730,8 +730,8 @@ public class HttpChannelState
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO, do we need to execute? or should it be done elsewhere?
|
||||||
if (handle)
|
if (handle)
|
||||||
// TODO, do we need to execute or just run?
|
|
||||||
_channel.execute(_channel);
|
_channel.execute(_channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -538,7 +538,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
{
|
{
|
||||||
_input.failed(x);
|
_input.failed(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class AsyncReadCallback implements Callback
|
private class AsyncReadCallback implements Callback
|
||||||
|
@ -547,8 +546,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
if (parseContent())
|
if (parseContent())
|
||||||
_channel.handle();
|
_channel.handle(); // TODO this call to handle can be duplicated by HttpInput.addContent calling onReadPossible
|
||||||
else if (!_input.isFinished())
|
else if (!_input.isFinished())
|
||||||
|
// TODO This may not always be correct. The main use-case is when the asyncReadCallback has succeeded because of
|
||||||
|
// some data that is not sufficient to produce anything to read (Eg one byte of a chunk header). We can't add
|
||||||
|
// zero length content because HttpInput.read() cannot return 0 as no bytes read! So instead we just say we are
|
||||||
|
// fill interested and look for more content. BUT maybe there is a case when this is not needed..... hmmm I think
|
||||||
|
// this is probably OK as the AsyncReadCallback is only ever used when there is not another thread reading etc.
|
||||||
asyncReadFillInterested();
|
asyncReadFillInterested();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -560,7 +564,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private class SendCallback extends IteratingCallback
|
private class SendCallback extends IteratingCallback
|
||||||
{
|
{
|
||||||
private MetaData.Response _info;
|
private MetaData.Response _info;
|
||||||
|
|
|
@ -347,6 +347,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO currently this is an active method that can dispatch a call to HttpChannel.handle. This can be duplicated by the AsyncReadCallback!
|
||||||
if (call_on_read_possible)
|
if (call_on_read_possible)
|
||||||
onReadPossible();
|
onReadPossible();
|
||||||
}
|
}
|
||||||
|
|
|
@ -356,6 +356,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
@Override
|
@Override
|
||||||
public void execute(Runnable job)
|
public void execute(Runnable job)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("queue {}",job);
|
||||||
if (!isRunning() || !_jobs.offer(job))
|
if (!isRunning() || !_jobs.offer(job))
|
||||||
{
|
{
|
||||||
LOG.warn("{} rejected {}", this, job);
|
LOG.warn("{} rejected {}", this, job);
|
||||||
|
@ -552,7 +554,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
// Job loop
|
// Job loop
|
||||||
while (job != null && isRunning())
|
while (job != null && isRunning())
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("run {}",job);
|
||||||
runJob(job);
|
runJob(job);
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("ran {}",job);
|
||||||
if (Thread.interrupted())
|
if (Thread.interrupted())
|
||||||
{
|
{
|
||||||
ignore=true;
|
ignore=true;
|
||||||
|
|
Loading…
Reference in New Issue