398467 Servlet 3.1 Non Blocking IO

Check point progress as I dispair at the insane complexity of servlet 3.1 specification
This commit is contained in:
Greg Wilkins 2013-05-24 14:34:27 +10:00
parent 6f8ed5e809
commit 94d1f4a020
7 changed files with 479 additions and 136 deletions

View File

@ -90,8 +90,8 @@ public class HttpChannelState
private State _state; private State _state;
private boolean _initial; private boolean _initial;
private boolean _dispatched;
private boolean _expired; private boolean _expired;
private boolean _asyncIO;
private volatile boolean _responseWrapped; private volatile boolean _responseWrapped;
private long _timeoutMs=DEFAULT_TIMEOUT; private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncContextEvent _event; private AsyncContextEvent _event;
@ -160,7 +160,6 @@ public class HttpChannelState
{ {
return _state+ return _state+
(_initial?",initial":"")+ (_initial?",initial":"")+
(_dispatched?",resumed":"")+
(_expired?",expired":""); (_expired?",expired":"");
} }
} }
@ -197,6 +196,11 @@ public class HttpChannelState
return Action.COMPLETE; return Action.COMPLETE;
case ASYNCWAIT: case ASYNCWAIT:
if (_asyncIO)
{
_asyncIO=false;
return Action.IO_CALLBACK;
}
return Action.WAIT; return Action.WAIT;
case COMPLETED: case COMPLETED:
@ -222,7 +226,6 @@ public class HttpChannelState
{ {
case DISPATCHED: case DISPATCHED:
case REDISPATCHED: case REDISPATCHED:
_dispatched=false;
_expired=false; _expired=false;
_responseWrapped=event.getSuppliedResponse()!=_channel.getResponse(); _responseWrapped=event.getSuppliedResponse()!=_channel.getResponse();
_responseWrapped=false; _responseWrapped=false;
@ -276,6 +279,12 @@ public class HttpChannelState
{ {
synchronized (this) synchronized (this)
{ {
if (_asyncIO)
{
_asyncIO=false;
return Action.IO_CALLBACK;
}
switch(_state) switch(_state)
{ {
case REDISPATCHED: case REDISPATCHED:
@ -319,14 +328,12 @@ public class HttpChannelState
case ASYNCSTARTED: case ASYNCSTARTED:
_state=State.REDISPATCHING; _state=State.REDISPATCHING;
_event.setDispatchTarget(context,path); _event.setDispatchTarget(context,path);
_dispatched=true;
return; return;
case ASYNCWAIT: case ASYNCWAIT:
dispatch=!_expired; dispatch=!_expired;
_state=State.REDISPATCH; _state=State.REDISPATCH;
_event.setDispatchTarget(context,path); _event.setDispatchTarget(context,path);
_dispatched=true;
break; break;
case REDISPATCH: case REDISPATCH:
@ -345,14 +352,6 @@ public class HttpChannelState
} }
} }
public boolean isDispatched()
{
synchronized (this)
{
return _dispatched;
}
}
protected void expired() protected void expired()
{ {
final List<AsyncListener> aListeners; final List<AsyncListener> aListeners;
@ -500,12 +499,12 @@ public class HttpChannelState
_state=State.IDLE; _state=State.IDLE;
} }
_initial = true; _initial = true;
_dispatched=false;
_expired=false; _expired=false;
_responseWrapped=false; _responseWrapped=false;
cancelTimeout(); cancelTimeout();
_timeoutMs=DEFAULT_TIMEOUT; _timeoutMs=DEFAULT_TIMEOUT;
_event=null; _event=null;
_asyncIO=false;
} }
} }
@ -652,11 +651,42 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute); _channel.getRequest().setAttribute(name,attribute);
} }
public void asyncIO() public void asyncIO()
{ {
// TODO Auto-generated method stub boolean handle=false;
synchronized (this)
{
switch(_state)
{
case IDLE:
throw new IllegalStateException();
case ASYNCWAIT:
_asyncIO=true;
handle=true;
break;
case ASYNCSTARTED:
case REDISPATCHING:
case REDISPATCHED:
case REDISPATCH:
case COMPLETECALLED:
case COMPLETED:
case COMPLETING:
case DISPATCHED:
_asyncIO=true;
}
}
if (handle)
{
ContextHandler handler=getContextHandler();
if (handler!=null)
handler.handle(_channel);
else
_channel.handle();
}
} }
public class AsyncTimeout implements Runnable public class AsyncTimeout implements Runnable

View File

@ -41,7 +41,7 @@ import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -582,7 +582,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
} }
private class CommitCallback extends IteratingCallback private class CommitCallback extends IteratingNestedCallback
{ {
final ByteBuffer _content; final ByteBuffer _content;
final boolean _lastContent; final boolean _lastContent;
@ -703,7 +703,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
} }
private class ContentCallback extends IteratingCallback private class ContentCallback extends IteratingNestedCallback
{ {
final ByteBuffer _content; final ByteBuffer _content;
final boolean _lastContent; final boolean _lastContent;

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -38,6 +37,7 @@ import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -217,6 +217,7 @@ write completed - - - ASYNC READY->owp
// Async or Blocking ? // Async or Blocking ?
while(true) while(true)
{ {
System.err.println("write "+_state);
switch(_state.get()) switch(_state.get())
{ {
case OPEN: case OPEN:
@ -245,6 +246,7 @@ write completed - - - ASYNC READY->owp
{ {
if (!_state.compareAndSet(State.PENDING, State.ASYNC)) if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException(); throw new IllegalStateException();
System.err.println("async complete ASYNC");
return; return;
} }
@ -255,6 +257,7 @@ write completed - - - ASYNC READY->owp
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,complete).process(); new AsyncWrite(b,off,len,complete).process();
System.err.println("async scheduled "+_state);
return; return;
case PENDING: case PENDING:
@ -564,12 +567,14 @@ write completed - - - ASYNC READY->owp
case ASYNC: case ASYNC:
if (!_state.compareAndSet(State.ASYNC, State.READY)) if (!_state.compareAndSet(State.ASYNC, State.READY))
continue; continue;
System.err.println("isReady ASYNC -> READY");
return true; return true;
case READY: case READY:
return true; return true;
case PENDING: case PENDING:
if (!_state.compareAndSet(State.PENDING, State.UNREADY)) if (!_state.compareAndSet(State.PENDING, State.UNREADY))
continue; continue;
System.err.println("isReady PENDING -> UNREADY");
return false; return false;
case UNREADY: case UNREADY:
return false; return false;
@ -581,35 +586,33 @@ write completed - - - ASYNC READY->owp
public void handle() public void handle()
{ {
if (_state.get()==State.READY)
{
try
{
_writeListener.onWritePossible();
return;
}
catch(Exception e)
{
_onError=e;
}
}
if(_onError!=null) if(_onError!=null)
{ {
Throwable th=_onError; Throwable th=_onError;
_onError=null; _onError=null;
_writeListener.onError(th); _writeListener.onError(th);
close(); close();
} }
if (_state.get()==State.READY)
{
try
{
_writeListener.onWritePossible();
}
catch (IOException e)
{
_writeListener.onError(e);
close();
}
}
} }
private class AsyncWrite implements Callback private class AsyncWrite extends AsyncFlush
{ {
private final byte[] _b; private final byte[] _b;
private final int _off; private final int _off;
private final int _len; private final int _len;
private final boolean _complete; private final boolean _complete;
private boolean _flushed;
public AsyncWrite(byte[] b, int off, int len, boolean complete) public AsyncWrite(byte[] b, int off, int len, boolean complete)
{ {
@ -619,138 +622,119 @@ write completed - - - ASYNC READY->owp
_complete=complete; _complete=complete;
} }
public void process() @Override
protected boolean process()
{ {
System.err.println("AsyncWrite#process "+_state);
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
System.err.println("write aggregate "+BufferUtil.toDetailString(_aggregate));
_channel.write(_aggregate, _complete && _len==0, this); _channel.write(_aggregate, _complete && _len==0, this);
return; return false;
} }
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_aggregate.capacity()/4) if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_aggregate.capacity()/4)
{
System.err.println("append aggregate");
BufferUtil.append(_aggregate, _b, _off, _len); BufferUtil.append(_aggregate, _b, _off, _len);
}
else if (_len>0 && !_flushed) else if (_len>0 && !_flushed)
{ {
_channel.write(ByteBuffer.wrap(_b, _off, _len), _complete,this); ByteBuffer buffer=ByteBuffer.wrap(_b, _off, _len);
return; System.err.println("write buffer "+_complete+" "+BufferUtil.toDetailString(buffer));
_flushed=true;
_channel.write(buffer, _complete, this);
return false;
} }
try if (_complete)
{ closed();
if (_complete) return true;
{
closed();
_channel.getResponse().closeOutput();
}
while(true)
{
switch(_state.get())
{
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
continue;
return;
case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY))
continue;
_channel.getState().asyncIO();
return;
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
}
}
catch (Exception e)
{
_onError=e;
_channel.getState().asyncIO();
close();
}
}
@Override
public void succeeded()
{
process();
}
@Override
public void failed(Throwable e)
{
_onError=e;
_channel.getState().asyncIO();
} }
} }
private class AsyncFlush implements Callback private class AsyncFlush extends IteratingCallback
{ {
private boolean _flushed; protected boolean _flushed;
public AsyncFlush() public AsyncFlush()
{ {
} }
public void process() @Override
protected boolean process()
{ {
// flush any content from the aggregate System.err.println("AsyncFlush#process "+_state);
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_flushed=true; _flushed=true;
_channel.write(_aggregate, false, this); _channel.write(_aggregate, false, this);
return; return false;
} }
if (!_flushed) if (!_flushed)
{
_flushed=true;
_channel.write(BufferUtil.EMPTY_BUFFER,false,this); _channel.write(BufferUtil.EMPTY_BUFFER,false,this);
return false;
}
return true;
}
@Override
protected void completed()
{
try try
{ {
while(true) loop: while(true)
{ {
switch(_state.get()) State last=_state.get();
switch(last)
{ {
case PENDING: case PENDING:
if (!_state.compareAndSet(State.PENDING, State.ASYNC)) if (!_state.compareAndSet(State.PENDING, State.ASYNC))
continue; continue;
return; System.err.println("AsyncFlush#completed "+last+" -> "+_state);
break;
case UNREADY: case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY)) if (!_state.compareAndSet(State.UNREADY, State.READY))
continue; continue;
System.err.println("AsyncFlush#completed "+last+" -> "+_state);
_channel.getState().asyncIO(); _channel.getState().asyncIO();
return; break;
case CLOSED: case CLOSED:
throw new EofException("Closed"); _onError=new EofException("Closed");
break;
default: default:
throw new IllegalStateException(); throw new IllegalStateException();
} }
break loop;
} }
} }
catch (Exception e) catch (Exception e)
{ {
e.printStackTrace();
_onError=e; _onError=e;
_channel.getState().asyncIO(); _channel.getState().asyncIO();
} }
} }
@Override @Override
public void succeeded() public void failed(Throwable e)
{
process();
}
@Override
public void failed(Throwable e)
{ {
e.printStackTrace();
_onError=e; _onError=e;
_channel.getState().asyncIO(); _channel.getState().asyncIO();
} }
} }
@ -762,7 +746,7 @@ write completed - - - ASYNC READY->owp
* be notified as each buffer is written and only once all the input is consumed will the * be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called. * wrapped {@link Callback#succeeded()} method be called.
*/ */
private class InputStreamWritingCB extends IteratingCallback private class InputStreamWritingCB extends IteratingNestedCallback
{ {
final InputStream _in; final InputStream _in;
final ByteBuffer _buffer; final ByteBuffer _buffer;
@ -819,7 +803,7 @@ write completed - - - ASYNC READY->owp
* be notified as each buffer is written and only once all the input is consumed will the * be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called. * wrapped {@link Callback#succeeded()} method be called.
*/ */
private class ReadableByteChannelWritingCB extends IteratingCallback private class ReadableByteChannelWritingCB extends IteratingNestedCallback
{ {
final ReadableByteChannel _in; final ReadableByteChannel _in;
final ByteBuffer _buffer; final ByteBuffer _buffer;
@ -867,5 +851,4 @@ write completed - - - ASYNC READY->owp
} }
} }
} }

View File

@ -81,7 +81,6 @@ public class StatisticsHandler extends HandlerWrapper
@Override @Override
public void onComplete(AsyncEvent event) throws IOException public void onComplete(AsyncEvent event) throws IOException
{ {
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState(); HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();
Request request = state.getBaseRequest(); Request request = state.getBaseRequest();
@ -92,8 +91,7 @@ public class StatisticsHandler extends HandlerWrapper
updateResponse(request); updateResponse(request);
if (!state.isDispatched()) _asyncWaitStats.decrement();
_asyncWaitStats.decrement();
} }
}; };
@ -139,9 +137,7 @@ public class StatisticsHandler extends HandlerWrapper
{ {
// resumed request // resumed request
start = System.currentTimeMillis(); start = System.currentTimeMillis();
_asyncWaitStats.decrement(); _asyncDispatches.incrementAndGet();
if (state.isDispatched())
_asyncDispatches.incrementAndGet();
} }
try try
@ -159,8 +155,10 @@ public class StatisticsHandler extends HandlerWrapper
if (state.isSuspended()) if (state.isSuspended())
{ {
if (state.isInitial()) if (state.isInitial())
{
state.addListener(_onCompletion); state.addListener(_onCompletion);
_asyncWaitStats.increment(); _asyncWaitStats.increment();
}
} }
else if (state.isInitial()) else if (state.isInitial())
{ {

View File

@ -0,0 +1,276 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlet;
import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AsyncServletIOTest
{
protected AsyncIOServlet _servlet=new AsyncIOServlet();
protected int _port;
protected Server _server = new Server();
protected ServletHandler _servletHandler;
protected ServerConnector _connector;
@Before
public void setUp() throws Exception
{
HttpConfiguration http_config = new HttpConfiguration();
http_config.setOutputBufferSize(4096);
_connector = new ServerConnector(_server,new HttpConnectionFactory(http_config));
_server.setConnectors(new Connector[]{ _connector });
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY|ServletContextHandler.NO_SESSIONS);
context.setContextPath("/ctx");
_server.setHandler(context);
_servletHandler=context.getServletHandler();
ServletHolder holder=new ServletHolder(_servlet);
holder.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder,"/path/*");
_server.start();
_port=_connector.getLocalPort();
}
@After
public void tearDown() throws Exception
{
_server.stop();
}
@Test
public void testEmpty() throws Exception
{
process();
}
@Test
public void testWrite() throws Exception
{
process(10);
}
@Test
public void testWrites() throws Exception
{
process(10,1,20,10);
}
@Test
public void testWritesFlushWrites() throws Exception
{
process(10,1,0,20,10);
}
@Test
public void testBigWrite() throws Exception
{
process(102400);
}
@Test
public void testBigWrites() throws Exception
{
List<String> list=process(102400,102400,102400,102400,102400,102400,102400,102400,102400,102400);
int blocked=0;
for (String line:list)
{
if ("-".equals(line))
blocked++;
}
Assert.assertThat(blocked,Matchers.greaterThan(1));
}
protected void assertContains(String content,String response)
{
Assert.assertThat(response,Matchers.containsString(content));
}
protected void assertNotContains(String content,String response)
{
Assert.assertThat(response,Matchers.not(Matchers.containsString(content)));
}
public synchronized List<String> process(int... writes) throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path/info");
char s='?';
for (int w: writes)
{
request.append(s).append("w=").append(w);
s='&';
}
request.append(" HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Connection: close\r\n")
.append("\r\n");
int port=_port;
List<String> list = new ArrayList<>();
try (Socket socket = new Socket("localhost",port);)
{
socket.setSoTimeout(1000000);
socket.setReceiveBufferSize(2048);
socket.getOutputStream().write(request.toString().getBytes("ISO-8859-1"));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
System.err.println("line: "+line);
Assert.assertThat(line,Matchers.startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
System.err.println("line: "+line);
if (line.length()==0)
break;
}
// Get body slowly
while (true)
{
line = in.readLine();
if (line==null)
break;
System.err.println("line: "+line.length()+"\t"+(line.length()>40?(line.substring(0,40)+"..."):line));
list.add(line);
}
}
// check lines
int w=0;
for (String line : list)
{
if ("-".equals(line))
continue;
assertEquals(writes[w],line.length());
assertEquals(line.charAt(0),'0'+(w%10));
w++;
if (w<writes.length && writes[w]<=0)
w++;
}
return list;
}
private static class AsyncIOServlet extends HttpServlet
{
private static final long serialVersionUID = -8161977157098646562L;
public AsyncIOServlet()
{}
/* ------------------------------------------------------------ */
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final String[] writes = request.getParameterValues("w");
final AsyncContext async = request.startAsync();
final ServletOutputStream out = response.getOutputStream();
out.setWriteListener(new WriteListener()
{
byte[] _owp="-\n".getBytes("ISO-8859-1");
int _w=0;
@Override
public void onWritePossible() throws IOException
{
System.err.println("OWP");
out.write(_owp);
while (writes!=null && _w< writes.length)
{
if (!out.isReady())
return;
int write=Integer.valueOf(writes[_w++]);
if (write==0)
out.flush();
else
{
byte[] buf=new byte[write+1];
Arrays.fill(buf,(byte)('0'+((_w-1)%10)));
buf[write]='\n';
out.write(buf);
}
if (!out.isReady())
return;
}
if (!out.isReady())
return;
System.err.println("COMPLETE!!!");
async.complete();
}
@Override
public void onError(Throwable t)
{
async.complete();
}
});
}
}
}

View File

@ -23,32 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Iterating Callback. /** Iterating Callback.
* <p>This specialised callback is used when breaking up an * <p>This specialized callback is used when breaking up an
* asynchronous task into smaller asynchronous tasks. A typical pattern * asynchronous task into smaller asynchronous tasks. A typical pattern
* is that a successful callback is used to schedule the next sub task, but * is that a successful callback is used to schedule the next sub task, but
* if that task completes quickly and uses the calling thread to callback * if that task completes quickly and uses the calling thread to callback
* the success notification, this can result in a growing stack depth. * the success notification, this can result in a growing stack depth.
* </p> * </p>
* <p>To avoid this issue, this callback uses an Atomicboolean to note * <p>To avoid this issue, this callback uses an AtomicBoolean to note
* if the success callback has been called during the processing of a * if the success callback has been called during the processing of a
* sub task, and if so then the processing iterates rather than recurses. * sub task, and if so then the processing iterates rather than recurses.
* </p> * </p>
* <p>This callback is passed to the asynchronous handling of each sub * <p>This callback is passed to the asynchronous handling of each sub
* task and a call the {@link #succeeded()} on this call back represents * task and a call the {@link #succeeded()} on this call back represents
* completion of the subtask. Only once all the subtasks are completed is * completion of the subtask. Only once all the subtasks are completed is
* the {@link Callback#succeeded()} method called on the {@link Callback} instance * the {#completed()} method called.</p>
* passed the the {@link #IteratingCallback(Callback)} constructor.</p>
* *
*/ */
public abstract class IteratingCallback implements Callback public abstract class IteratingCallback implements Callback
{ {
final AtomicBoolean _iterating = new AtomicBoolean(); private final AtomicBoolean _iterating = new AtomicBoolean();
final Callback _callback;
public IteratingCallback()
public IteratingCallback(Callback callback)
{ {
_callback=callback;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -63,6 +59,8 @@ public abstract class IteratingCallback implements Callback
*/ */
abstract protected boolean process() throws Exception; abstract protected boolean process() throws Exception;
abstract protected void completed();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** This method is called initially to start processing and /** This method is called initially to start processing and
* is then called by subsequent sub task success to continue * is then called by subsequent sub task success to continue
@ -78,34 +76,28 @@ public abstract class IteratingCallback implements Callback
// process and test if we are complete // process and test if we are complete
if (process()) if (process())
{ {
_callback.succeeded(); completed();
return; return;
} }
} }
} }
catch(Exception e) catch(Exception e)
{ {
e.printStackTrace();
_iterating.set(false); _iterating.set(false);
_callback.failed(e); failed(e);
} }
finally finally
{ {
_iterating.set(false); _iterating.set(false);
} }
} }
/* ------------------------------------------------------------ */
@Override @Override
public void succeeded() public void succeeded()
{ {
if (!_iterating.compareAndSet(true,false)) if (!_iterating.compareAndSet(true,false))
iterate(); iterate();
} }
@Override
public void failed(Throwable x)
{
_callback.failed(x);
}
} }

View File

@ -0,0 +1,64 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
/* ------------------------------------------------------------ */
/** Iterating Nested Callback.
* <p>This specialized callback is used when breaking up an
* asynchronous task into smaller asynchronous tasks. A typical pattern
* is that a successful callback is used to schedule the next sub task, but
* if that task completes quickly and uses the calling thread to callback
* the success notification, this can result in a growing stack depth.
* </p>
* <p>To avoid this issue, this callback uses an AtomicBoolean to note
* if the success callback has been called during the processing of a
* sub task, and if so then the processing iterates rather than recurses.
* </p>
* <p>This callback is passed to the asynchronous handling of each sub
* task and a call the {@link #succeeded()} on this call back represents
* completion of the subtask. Only once all the subtasks are completed is
* the {@link Callback#succeeded()} method called on the {@link Callback} instance
* passed the the {@link #IteratingCallback(Callback)} constructor.</p>
*
*/
public abstract class IteratingNestedCallback extends IteratingCallback
{
final Callback _callback;
public IteratingNestedCallback(Callback callback)
{
_callback=callback;
}
@Override
protected void completed()
{
_callback.succeeded();
}
/* ------------------------------------------------------------ */
@Override
public void failed(Throwable x)
{
x.printStackTrace();
_callback.failed(x);
}
}