460210 - ExecutionStragegy producer for SelectManager calls onOpen from produce method

Further refactoring.   Handle the initial onDataAvailable and final onAllDataRead calls
specially, as they may need to be called without scheduling read interest.
This commit is contained in:
Greg Wilkins 2015-02-25 10:47:09 +11:00
parent 7c315ebce5
commit f44bf8e368
5 changed files with 83 additions and 21 deletions

View File

@ -35,6 +35,7 @@ public abstract class FillInterest
{ {
private final static Logger LOG = Log.getLogger(FillInterest.class); private final static Logger LOG = Log.getLogger(FillInterest.class);
private final AtomicReference<Callback> _interested = new AtomicReference<>(null); private final AtomicReference<Callback> _interested = new AtomicReference<>(null);
private Throwable _lastSet;
protected FillInterest() protected FillInterest()
{ {
@ -53,9 +54,19 @@ public abstract class FillInterest
if (callback == null) if (callback == null)
throw new IllegalArgumentException(); throw new IllegalArgumentException();
if (!_interested.compareAndSet(null, callback)) if (_interested.compareAndSet(null, callback))
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable();
}
}
else
{ {
LOG.warn("Read pending for {} prevented {}", _interested, callback); LOG.warn("Read pending for {} prevented {}", _interested, callback);
if (LOG.isDebugEnabled())
LOG.warn("callback set at ",_lastSet);
throw new ReadPendingException(); throw new ReadPendingException();
} }
try try

View File

@ -37,6 +37,10 @@ import org.eclipse.jetty.util.thread.SpinLock;
/** /**
* Implementation of AsyncContext interface that holds the state of request-response cycle. * Implementation of AsyncContext interface that holds the state of request-response cycle.
*/ */
/* ------------------------------------------------------------ */
/**
*/
public class HttpChannelState public class HttpChannelState
{ {
private static final Logger LOG = Log.getLogger(HttpChannelState.class); private static final Logger LOG = Log.getLogger(HttpChannelState.class);
@ -747,6 +751,14 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute); _channel.getRequest().setAttribute(name,attribute);
} }
/* ------------------------------------------------------------ */
/** Called to signal async read isReady() has returned false.
* This indicates that there is no content available to be consumed
* and that once the channel enteres the ASYNC_WAIT state it will
* register for read interest by calling {@link HttpChannel#asyncReadFillInterested()}
* either from this method or from a subsequent call to {@link #unhandle()}.
*/
public void onReadUnready() public void onReadUnready()
{ {
boolean interested=false; boolean interested=false;
@ -766,6 +778,13 @@ public class HttpChannelState
_channel.asyncReadFillInterested(); _channel.asyncReadFillInterested();
} }
/* ------------------------------------------------------------ */
/** Called to signal that content is now available to read.
* If the channel is in ASYNC_WAIT state and unready (ie isReady() has
* returned false), then the state is changed to ASYNC_WOKEN and true
* is returned.
* @return True IFF the channel was unready and in ASYNC_WAIT state
*/
public boolean onReadPossible() public boolean onReadPossible()
{ {
boolean woken=false; boolean woken=false;
@ -781,6 +800,29 @@ public class HttpChannelState
return woken; return woken;
} }
/* ------------------------------------------------------------ */
/** Called to signal that the channel is ready for a callback.
* This is similar to calling {@link #onReadUnready()} followed by
* {@link #onReadPossible()}, except that as content is already
* available, read interest is never set.
* @return
*/
public boolean onReadReady()
{
boolean woken=false;
try(SpinLock.Lock lock=_lock.lock())
{
_asyncReadUnready=true;
_asyncReadPossible=true;
if (_state==State.ASYNC_WAIT)
{
woken=true;
_state=State.ASYNC_WOKEN;
}
}
return woken;
}
public boolean isReadPossible() public boolean isReadPossible()
{ {
try(SpinLock.Lock lock=_lock.lock()) try(SpinLock.Lock lock=_lock.lock())

View File

@ -207,8 +207,7 @@ public class HttpInput extends ServletInputStream implements Runnable
else else
{ {
_state=AEOF; _state=AEOF;
_channelState.onReadUnready(); boolean woken = _channelState.onReadReady(); // force callback?
boolean woken = _channelState.onReadPossible(); // force callback
if (woken) if (woken)
wake(); wake();
} }
@ -502,7 +501,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public void setReadListener(ReadListener readListener) public void setReadListener(ReadListener readListener)
{ {
readListener = Objects.requireNonNull(readListener); readListener = Objects.requireNonNull(readListener);
boolean content; boolean woken=false;
try try
{ {
synchronized (_inputQ) synchronized (_inputQ)
@ -511,8 +510,11 @@ public class HttpInput extends ServletInputStream implements Runnable
throw new IllegalStateException("state=" + _state); throw new IllegalStateException("state=" + _state);
_state = ASYNC; _state = ASYNC;
_listener = readListener; _listener = readListener;
boolean content=nextContent()!=null;
if (content)
woken = _channelState.onReadReady();
else
_channelState.onReadUnready(); _channelState.onReadUnready();
content=nextContent()!=null;
} }
} }
catch(IOException e) catch(IOException e)
@ -520,11 +522,8 @@ public class HttpInput extends ServletInputStream implements Runnable
throw new RuntimeIOException(e); throw new RuntimeIOException(e);
} }
boolean woken = content && _channelState.onReadPossible();
// TODO something with woken?
if (woken) if (woken)
throw new IllegalStateException("How do we wake?"); wake();
} }
public boolean failed(Throwable x) public boolean failed(Throwable x)

View File

@ -113,6 +113,7 @@ public class HttpInputTest
} }
}) })
{ {
@Override @Override
public void onReadUnready() public void onReadUnready()
{ {
@ -126,6 +127,13 @@ public class HttpInputTest
_history.add("onReadPossible"); _history.add("onReadPossible");
return super.onReadPossible(); return super.onReadPossible();
} }
@Override
public boolean onReadReady()
{
_history.add("ready");
return super.onReadReady();
}
}) })
{ {
@Override @Override
@ -338,8 +346,8 @@ public class HttpInputTest
public void testAsyncEmpty() throws Exception public void testAsyncEmpty() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0")); assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
_in.run(); _in.run();
@ -362,8 +370,8 @@ public class HttpInputTest
public void testAsyncRead() throws Exception public void testAsyncRead() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0")); assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
_in.run(); _in.run();
@ -416,8 +424,8 @@ public class HttpInputTest
public void testAsyncEOF() throws Exception public void testAsyncEOF() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0")); assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
_in.run(); _in.run();
@ -432,8 +440,7 @@ public class HttpInputTest
assertThat(_in.read(),equalTo(-1)); assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true)); assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),equalTo("ready"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
} }
@ -441,8 +448,8 @@ public class HttpInputTest
public void testAsyncReadEOF() throws Exception public void testAsyncReadEOF() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0")); assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
_in.run(); _in.run();
@ -482,8 +489,7 @@ public class HttpInputTest
assertThat(_in.isFinished(),equalTo(false)); assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo(-1)); assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true)); assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),equalTo("ready"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true)); assertThat(_in.isReady(),equalTo(true));
@ -496,8 +502,8 @@ public class HttpInputTest
public void testAsyncError() throws Exception public void testAsyncError() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0")); assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue()); assertThat(_history.poll(),nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); assertThat(_history.poll(),equalTo("onDataAvailable"));

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
@ -610,10 +611,13 @@ public class AsyncIOServletTest
response.flushBuffer(); response.flushBuffer();
final AsyncContext async = request.startAsync(); final AsyncContext async = request.startAsync();
async.setTimeout(5000); async.setTimeout(500000);
final ServletInputStream in = request.getInputStream(); final ServletInputStream in = request.getInputStream();
final ServletOutputStream out = response.getOutputStream(); final ServletOutputStream out = response.getOutputStream();
if (request.getDispatcherType()==DispatcherType.ERROR)
throw new IllegalStateException();
in.setReadListener(new ReadListener() in.setReadListener(new ReadListener()
{ {
@Override @Override
@ -669,7 +673,7 @@ public class AsyncIOServletTest
try (Socket client = new Socket("localhost", connector.getLocalPort())) try (Socket client = new Socket("localhost", connector.getLocalPort()))
{ {
client.setSoTimeout(5000); client.setSoTimeout(500000);
OutputStream output = client.getOutputStream(); OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8")); output.write(request.getBytes("UTF-8"));
output.flush(); output.flush();