398467 Servlet 3.1 Non Blocking IO

Working async writes after HttpChannelState refactor
This commit is contained in:
Greg Wilkins 2013-05-25 15:45:16 +10:00
parent 94d1f4a020
commit c7be106423
6 changed files with 146 additions and 224 deletions

View File

@ -63,12 +63,8 @@ public class HttpChannelState
{
IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet
ASYNCSTARTED, // Suspend called, but not yet returned to container
REDISPATCHING, // resumed while dispatched
ASYNCWAIT, // Suspended and parked
REDISPATCH, // Has been scheduled
REDISPATCHED, // Request redispatched to filter/servlet
COMPLETECALLED,// complete called
ASYNCIO, // Has been scheduled
COMPLETING, // Request is completable
COMPLETED // Request is complete
}
@ -83,16 +79,24 @@ public class HttpChannelState
COMPLETE, // Complete the channel
RECYCLE, // Channel is completed
}
public enum Async
{
STARTED,
DISPATCH,
COMPLETE,
EXPIRING,
EXPIRED
}
private final HttpChannel<?> _channel;
private List<AsyncListener> _lastAsyncListeners;
private List<AsyncListener> _asyncListeners;
private State _state;
private Async _async;
private boolean _initial;
private boolean _expired;
private boolean _asyncIO;
private volatile boolean _responseWrapped;
private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
@ -100,6 +104,7 @@ public class HttpChannelState
{
_channel=channel;
_state=State.IDLE;
_async=null;
_initial=true;
}
@ -159,8 +164,8 @@ public class HttpChannelState
synchronized (this)
{
return _state+
(_initial?",initial":"")+
(_expired?",expired":"");
(_initial?",initial ":" ")+
_async;
}
}
@ -185,31 +190,45 @@ public class HttpChannelState
_asyncListeners=_lastAsyncListeners;
_lastAsyncListeners=null;
}
_responseWrapped=false;
return Action.REQUEST_DISPATCH;
case COMPLETECALLED:
_state=State.COMPLETING;
return Action.COMPLETE;
case COMPLETING:
return Action.COMPLETE;
case ASYNCWAIT:
if (_asyncIO)
{
_asyncIO=false;
return Action.IO_CALLBACK;
}
return Action.WAIT;
case COMPLETED:
return Action.RECYCLE;
case REDISPATCH:
_state=State.REDISPATCHED;
_responseWrapped=false;
return _expired?Action.ASYNC_EXPIRED:Action.ASYNC_DISPATCH;
case ASYNCWAIT:
if (_asyncIO)
{
_state=State.ASYNCIO;
_asyncIO=false;
return Action.IO_CALLBACK;
}
if (_async!=null)
{
Async async=_async;
_async=null;
switch(async)
{
case COMPLETE:
_state=State.COMPLETING;
return Action.COMPLETE;
case DISPATCH:
_state=State.DISPATCHED;
return Action.ASYNC_DISPATCH;
case EXPIRING:
break;
case EXPIRED:
_state=State.DISPATCHED;
return Action.ASYNC_EXPIRED;
case STARTED:
throw new IllegalStateException(this.getStatusString());
}
}
return Action.WAIT;
default:
throw new IllegalStateException(this.getStatusString());
@ -217,30 +236,20 @@ public class HttpChannelState
}
}
public void startAsync(AsyncContextEvent event)
{
synchronized (this)
{
switch(_state)
{
case DISPATCHED:
case REDISPATCHED:
_expired=false;
_responseWrapped=event.getSuppliedResponse()!=_channel.getResponse();
_responseWrapped=false;
_event=event;
_state=State.ASYNCSTARTED;
List<AsyncListener> listeners=_lastAsyncListeners;
_lastAsyncListeners=_asyncListeners;
if (listeners!=null)
listeners.clear();
_asyncListeners=listeners;
break;
default:
throw new IllegalStateException(this.getStatusString());
}
if (_state!=State.DISPATCHED || _async!=null)
throw new IllegalStateException(this.getStatusString());
_async=Async.STARTED;
_event=event;
List<AsyncListener> listeners=_lastAsyncListeners;
_lastAsyncListeners=_asyncListeners;
if (listeners!=null)
listeners.clear();
_asyncListeners=listeners;
}
if (_lastAsyncListeners!=null)
@ -279,41 +288,49 @@ public class HttpChannelState
{
synchronized (this)
{
if (_asyncIO)
{
_asyncIO=false;
return Action.IO_CALLBACK;
}
switch(_state)
{
case REDISPATCHED:
case DISPATCHED:
_state=State.COMPLETING;
return Action.COMPLETE;
case IDLE:
throw new IllegalStateException(this.getStatusString());
case ASYNCSTARTED:
_initial=false;
_state=State.ASYNCWAIT;
scheduleTimeout();
return Action.WAIT;
case REDISPATCHING:
_initial=false;
_state=State.REDISPATCHED;
return _expired?Action.ASYNC_EXPIRED:Action.ASYNC_DISPATCH;
case COMPLETECALLED:
_initial=false;
_state=State.COMPLETING;
return Action.COMPLETE;
case ASYNCIO:
break;
default:
throw new IllegalStateException(this.getStatusString());
}
if (_asyncIO)
{
_asyncIO=false;
_state=State.ASYNCIO;
return Action.IO_CALLBACK;
}
if (_async!=null)
{
_initial=false;
switch(_async)
{
case COMPLETE:
_state=State.COMPLETING;
_async=null;
return Action.COMPLETE;
case DISPATCH:
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_DISPATCH;
case EXPIRED:
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_EXPIRED;
case EXPIRING:
case STARTED:
scheduleTimeout();
_state=State.ASYNCWAIT;
return Action.WAIT;
}
}
_state=State.COMPLETING;
return Action.COMPLETE;
}
}
@ -322,34 +339,26 @@ public class HttpChannelState
boolean dispatch;
synchronized (this)
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException(this.getStatusString());
_async=Async.DISPATCH;
_event.setDispatchTarget(context,path);
switch(_state)
{
case ASYNCSTARTED:
_state=State.REDISPATCHING;
_event.setDispatchTarget(context,path);
return;
case ASYNCWAIT:
dispatch=!_expired;
_state=State.REDISPATCH;
_event.setDispatchTarget(context,path);
case DISPATCHED:
case ASYNCIO:
dispatch=false;
break;
case REDISPATCH:
_event.setDispatchTarget(context,path);
return;
default:
throw new IllegalStateException(this.getStatusString());
dispatch=true;
break;
}
}
cancelTimeout();
if (dispatch)
{
cancelTimeout();
scheduleDispatch();
}
}
protected void expired()
@ -358,17 +367,11 @@ public class HttpChannelState
AsyncEvent event;
synchronized (this)
{
switch(_state)
{
case ASYNCSTARTED:
case ASYNCWAIT:
_expired=true;
event=_event;
aListeners=_asyncListeners;
break;
default:
return;
}
if (_async!=Async.STARTED)
return;
_async=Async.EXPIRING;
event=_event;
aListeners=_asyncListeners;
}
if (aListeners!=null)
@ -385,22 +388,20 @@ public class HttpChannelState
}
}
}
boolean dispatch=false;
synchronized (this)
{
switch(_state)
if (_async==Async.EXPIRING)
{
case ASYNCSTARTED:
case ASYNCWAIT:
_state=State.REDISPATCH;
break;
default:
_expired=false;
break;
_async=Async.EXPIRED;
if (_state==State.ASYNCWAIT)
dispatch=true;
}
}
scheduleDispatch();
if (dispatch)
scheduleDispatch();
}
public void complete()
@ -409,30 +410,15 @@ public class HttpChannelState
boolean handle;
synchronized (this)
{
switch(_state)
{
case DISPATCHED:
case REDISPATCHED:
throw new IllegalStateException(this.getStatusString());
case IDLE:
case ASYNCSTARTED:
_state=State.COMPLETECALLED;
return;
case ASYNCWAIT:
_state=State.COMPLETECALLED;
handle=!_expired;
break;
default:
throw new IllegalStateException(this.getStatusString());
}
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException(this.getStatusString());
_async=Async.COMPLETE;
handle=_state==State.ASYNCWAIT;
}
cancelTimeout();
if (handle)
{
cancelTimeout();
ContextHandler handler=getContextHandler();
if (handler!=null)
handler.handle(_channel);
@ -493,14 +479,14 @@ public class HttpChannelState
switch(_state)
{
case DISPATCHED:
case REDISPATCHED:
case ASYNCIO:
throw new IllegalStateException(getStatusString());
default:
_state=State.IDLE;
break;
}
_state=State.IDLE;
_async=null;
_initial = true;
_expired=false;
_responseWrapped=false;
cancelTimeout();
_timeoutMs=DEFAULT_TIMEOUT;
_event=null;
@ -531,7 +517,7 @@ public class HttpChannelState
{
synchronized (this)
{
return _expired;
return _async==Async.EXPIRED;
}
}
@ -547,17 +533,7 @@ public class HttpChannelState
{
synchronized(this)
{
switch(_state)
{
case ASYNCSTARTED:
case REDISPATCHING:
case COMPLETECALLED:
case ASYNCWAIT:
return true;
default:
return false;
}
return _state==State.ASYNCWAIT || _state==State.DISPATCHED && _async==Async.STARTED;
}
}
@ -572,18 +548,8 @@ public class HttpChannelState
public boolean isAsyncStarted()
{
synchronized (this)
{
switch(_state)
{
case ASYNCSTARTED:
case REDISPATCHING:
case COMPLETECALLED:
case ASYNCWAIT:
return true;
default:
return false;
}
{
return _async==Async.STARTED;
}
}
@ -591,19 +557,7 @@ public class HttpChannelState
{
synchronized (this)
{
switch(_state)
{
case ASYNCSTARTED:
case REDISPATCHING:
case ASYNCWAIT:
case REDISPATCHED:
case REDISPATCH:
case COMPLETECALLED:
return true;
default:
return false;
}
return !_initial || _async!=null;
}
}
@ -631,7 +585,7 @@ public class HttpChannelState
public ServletResponse getServletResponse()
{
if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
if (_event!=null && _event.getSuppliedResponse()!=null)
return _event.getSuppliedResponse();
return _channel.getResponse();
}
@ -653,30 +607,13 @@ public class HttpChannelState
public void asyncIO()
{
boolean handle=false;
boolean handle;
synchronized (this)
{
switch(_state)
{
case IDLE:
throw new IllegalStateException();
case ASYNCWAIT:
_asyncIO=true;
handle=true;
break;
case ASYNCSTARTED:
case REDISPATCHING:
case REDISPATCHED:
case REDISPATCH:
case COMPLETECALLED:
case COMPLETED:
case COMPLETING:
case DISPATCHED:
_asyncIO=true;
}
_asyncIO=true;
handle=_state==State.ASYNCWAIT;
}
if (handle)

View File

@ -217,7 +217,6 @@ write completed - - - ASYNC READY->owp
// Async or Blocking ?
while(true)
{
System.err.println("write "+_state);
switch(_state.get())
{
case OPEN:
@ -246,7 +245,6 @@ write completed - - - ASYNC READY->owp
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException();
System.err.println("async complete ASYNC");
return;
}
@ -257,7 +255,6 @@ write completed - - - ASYNC READY->owp
// Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,complete).process();
System.err.println("async scheduled "+_state);
return;
case PENDING:
@ -567,14 +564,12 @@ write completed - - - ASYNC READY->owp
case ASYNC:
if (!_state.compareAndSet(State.ASYNC, State.READY))
continue;
System.err.println("isReady ASYNC -> READY");
return true;
case READY:
return true;
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.UNREADY))
continue;
System.err.println("isReady PENDING -> UNREADY");
return false;
case UNREADY:
return false;
@ -625,24 +620,20 @@ write completed - - - ASYNC READY->owp
@Override
protected boolean process()
{
System.err.println("AsyncWrite#process "+_state);
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
System.err.println("write aggregate "+BufferUtil.toDetailString(_aggregate));
_channel.write(_aggregate, _complete && _len==0, this);
return false;
}
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_aggregate.capacity()/4)
{
System.err.println("append aggregate");
BufferUtil.append(_aggregate, _b, _off, _len);
}
else if (_len>0 && !_flushed)
{
ByteBuffer buffer=ByteBuffer.wrap(_b, _off, _len);
System.err.println("write buffer "+_complete+" "+BufferUtil.toDetailString(buffer));
_flushed=true;
_channel.write(buffer, _complete, this);
return false;
@ -665,8 +656,6 @@ write completed - - - ASYNC READY->owp
@Override
protected boolean process()
{
System.err.println("AsyncFlush#process "+_state);
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
@ -697,13 +686,11 @@ write completed - - - ASYNC READY->owp
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
continue;
System.err.println("AsyncFlush#completed "+last+" -> "+_state);
break;
case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY))
continue;
System.err.println("AsyncFlush#completed "+last+" -> "+_state);
_channel.getState().asyncIO();
break;
@ -720,7 +707,6 @@ write completed - - - ASYNC READY->owp
}
catch (Exception e)
{
e.printStackTrace();
_onError=e;
_channel.getState().asyncIO();
}

View File

@ -954,7 +954,9 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
if (old_context != _scontext)
{
// check the target.
if (DispatcherType.REQUEST.equals(dispatch) || DispatcherType.ASYNC.equals(dispatch) || (DispatcherType.ERROR.equals(dispatch) && baseRequest.getHttpChannelState().isExpired()))
if (DispatcherType.REQUEST.equals(dispatch) ||
DispatcherType.ASYNC.equals(dispatch) ||
DispatcherType.ERROR.equals(dispatch) && baseRequest.getHttpChannelState().isAsync())
{
if (_compactPath)
target = URIUtil.compactPath(target);

View File

@ -115,7 +115,7 @@ public class AsyncServletIOTest
@Test
public void testBigWrites() throws Exception
{
List<String> list=process(102400,102400,102400,102400,102400,102400,102400,102400,102400,102400);
List<String> list=process(102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400);
int blocked=0;
for (String line:list)
{
@ -160,21 +160,20 @@ public class AsyncServletIOTest
try (Socket socket = new Socket("localhost",port);)
{
socket.setSoTimeout(1000000);
socket.setReceiveBufferSize(2048);
socket.getOutputStream().write(request.toString().getBytes("ISO-8859-1"));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
System.err.println("line: "+line);
//System.err.println("line: "+line);
Assert.assertThat(line,Matchers.startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
System.err.println("line: "+line);
//System.err.println("line: "+line);
if (line.length()==0)
break;
}
@ -185,7 +184,7 @@ public class AsyncServletIOTest
line = in.readLine();
if (line==null)
break;
System.err.println("line: "+line.length()+"\t"+(line.length()>40?(line.substring(0,40)+"..."):line));
//System.err.println("line: "+line.length()+"\t"+(line.length()>40?(line.substring(0,40)+"..."):line));
list.add(line);
}
}
@ -232,7 +231,7 @@ public class AsyncServletIOTest
@Override
public void onWritePossible() throws IOException
{
System.err.println("OWP");
//System.err.println("OWP");
out.write(_owp);
while (writes!=null && _w< writes.length)
@ -258,7 +257,7 @@ public class AsyncServletIOTest
if (!out.isReady())
return;
System.err.println("COMPLETE!!!");
//System.err.println("COMPLETE!!!");
async.complete();
}

View File

@ -83,7 +83,6 @@ public abstract class IteratingCallback implements Callback
}
catch(Exception e)
{
e.printStackTrace();
_iterating.set(false);
failed(e);
}

View File

@ -57,7 +57,6 @@ public abstract class IteratingNestedCallback extends IteratingCallback
@Override
public void failed(Throwable x)
{
x.printStackTrace();
_callback.failed(x);
}