jetty-9 lots of renaming

This commit is contained in:
Greg Wilkins 2012-06-06 21:35:51 +02:00
parent f6643ccbe7
commit 3f80ec2bc2
14 changed files with 92 additions and 88 deletions

View File

@ -14,8 +14,8 @@ import org.eclipse.jetty.util.log.Logger;
/** A convenience base implementation of {@link AsyncConnection}. /** A convenience base implementation of {@link AsyncConnection}.
* <p> * <p>
* This class uses the capabilities of the {@link AsyncEndPoint} API to provide a * This class uses the capabilities of the {@link AsyncEndPoint} API to provide a
* more traditional style of async reading. A call to {@link #readInterested()} * more traditional style of async reading. A call to {@link #fillInterested()}
* will schedule a callback to {@link #onReadable()} or {@link #onReadFail(Throwable)} * will schedule a callback to {@link #onFillable()} or {@link #onReadFail(Throwable)}
* as appropriate. * as appropriate.
*/ */
public abstract class AbstractAsyncConnection implements AsyncConnection public abstract class AbstractAsyncConnection implements AsyncConnection
@ -44,7 +44,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
protected void onCompleted(Void context) protected void onCompleted(Void context)
{ {
if (_readInterested.compareAndSet(true,false)) if (_readInterested.compareAndSet(true,false))
onReadable(); onFillable();
} }
@Override @Override
@ -71,17 +71,19 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Call to register read interest. /** Call to register read interest.
* After this call, {@link #onReadable()} or {@link #onReadFail(Throwable)} * After this call, {@link #onFillable()} or {@link #onReadFail(Throwable)}
* will be called back as appropriate. * will be called back as appropriate.
*/ */
public void readInterested() public void fillInterested()
{ {
if (_readInterested.compareAndSet(false,true)) if (_readInterested.compareAndSet(false,true))
getEndPoint().readable(null,_readCallback); getEndPoint().fillInterested(null,_readCallback);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public abstract void onReadable(); /** Callback when the endpoint is fillable.
*/
public abstract void onFillable();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void onReadFail(Throwable cause) public void onReadFail(Throwable cause)

View File

@ -95,7 +95,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
} }
@Override @Override
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{ {
_readInterest.register(context,callback); _readInterest.register(context,callback);
} }

View File

@ -24,14 +24,14 @@ import org.eclipse.jetty.util.FutureCallback;
* from: * from:
* <blockquote><pre> * <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>(); * FutureCallback<String> future = new FutureCallback<>();
* endpoint.readable("ContextObj",future); * endpoint.fillInterested("ContextObj",future);
* ... * ...
* String context = future.get(); // This blocks * String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);</pre></blockquote> * int filled=endpoint.fill(mybuffer);</pre></blockquote>
* <h3>Dispatched Read</h3> * <h3>Dispatched Read</h3>
* By using a different callback, the read can be done asynchronously in its own dispatched thread: * By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <blockquote><pre> * <blockquote><pre>
* endpoint.readable("ContextObj",new ExecutorCallback<String>(executor) * endpoint.fillInterested("ContextObj",new ExecutorCallback<String>(executor)
* { * {
* public void onCompleted(String context) * public void onCompleted(String context)
* { * {
@ -71,14 +71,14 @@ import org.eclipse.jetty.util.FutureCallback;
public interface AsyncEndPoint extends EndPoint public interface AsyncEndPoint extends EndPoint
{ {
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Asynchronous a readable notification. /** Asynchronous a fillable notification.
* <p> * <p>
* This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF. * This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF.
* @param context Context to return via the callback * @param context Context to return via the callback
* @param callback The callback to call when an error occurs or we are readable. * @param callback The callback to call when an error occurs or we are readable.
* @throws ReadPendingException if another read operation is concurrent. * @throws ReadPendingException if another read operation is concurrent.
*/ */
<C> void readable(C context, Callback<C> callback) throws ReadPendingException; <C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Asynchronous write operation. /** Asynchronous write operation.

View File

@ -10,7 +10,7 @@ import org.eclipse.jetty.util.Callback;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* A Utility class to help implement {@link AsyncEndPoint#readable(Object, Callback)} * A Utility class to help implement {@link AsyncEndPoint#fillInterested(Object, Callback)}
* by keeping state and calling the context and callback objects. * by keeping state and calling the context and callback objects.
* *
*/ */

View File

@ -87,7 +87,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{ {
_readInterest.register(context,callback); _readInterest.register(context,callback);
} }

View File

@ -124,7 +124,7 @@ public class SslConnection extends AbstractAsyncConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void onReadable() public void onFillable()
{ {
LOG.debug("{} onReadable",this); LOG.debug("{} onReadable",this);
@ -262,7 +262,7 @@ public class SslConnection extends AbstractAsyncConnection
} }
else else
// Normal readable callback // Normal readable callback
readInterested(); SslConnection.this.fillInterested();
return false; return false;
} }
@ -285,7 +285,7 @@ public class SslConnection extends AbstractAsyncConnection
} }
else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP ) else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP )
// we are actually read blocked in order to write // we are actually read blocked in order to write
readInterested(); SslConnection.this.fillInterested();
else else
{ {
// try the flush again // try the flush again
@ -307,7 +307,7 @@ public class SslConnection extends AbstractAsyncConnection
} }
@Override @Override
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{ {
_readInterest.register(context,callback); _readInterest.register(context,callback);
} }

View File

@ -30,14 +30,14 @@ public class AsyncByteArrayEndPointTest
ByteBuffer buffer = BufferUtil.allocate(1024); ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<String> fcb = new FutureCallback<>(); FutureCallback<String> fcb = new FutureCallback<>();
endp.readable("CTX",fcb); endp.fillInterested("CTX",fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX",fcb.get());
assertEquals(10,endp.fill(buffer)); assertEquals(10,endp.fill(buffer));
assertEquals("test input",BufferUtil.toString(buffer)); assertEquals("test input",BufferUtil.toString(buffer));
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.readable("CTX",fcb); endp.fillInterested("CTX",fcb);
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals(0,endp.fill(buffer)); assertEquals(0,endp.fill(buffer));
@ -48,7 +48,7 @@ public class AsyncByteArrayEndPointTest
assertEquals("test input more",BufferUtil.toString(buffer)); assertEquals("test input more",BufferUtil.toString(buffer));
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.readable("CTX",fcb); endp.fillInterested("CTX",fcb);
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals(0,endp.fill(buffer)); assertEquals(0,endp.fill(buffer));
@ -58,7 +58,7 @@ public class AsyncByteArrayEndPointTest
assertEquals(-1,endp.fill(buffer)); assertEquals(-1,endp.fill(buffer));
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.readable("CTX",fcb); endp.fillInterested("CTX",fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX",fcb.get());
assertEquals(-1,endp.fill(buffer)); assertEquals(-1,endp.fill(buffer));
@ -66,7 +66,7 @@ public class AsyncByteArrayEndPointTest
endp.close(); endp.close();
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.readable("CTX",fcb); endp.fillInterested("CTX",fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
try try
{ {
@ -126,7 +126,7 @@ public class AsyncByteArrayEndPointTest
ByteBuffer buffer = BufferUtil.allocate(1024); ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<Void> fcb = new FutureCallback<>(); FutureCallback<Void> fcb = new FutureCallback<>();
endp.readable(null,fcb); endp.fillInterested(null,fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals(null,fcb.get()); assertEquals(null,fcb.get());
assertEquals(4,endp.fill(buffer)); assertEquals(4,endp.fill(buffer));
@ -134,7 +134,7 @@ public class AsyncByteArrayEndPointTest
// read timeout // read timeout
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.readable(null,fcb); endp.fillInterested(null,fcb);
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
try try
{ {

View File

@ -129,11 +129,11 @@ public class SelectChannelEndPointTest
@Override @Override
public void onOpen() public void onOpen()
{ {
readInterested(); fillInterested();
} }
@Override @Override
public synchronized void onReadable() public synchronized void onFillable()
{ {
AsyncEndPoint _endp = getEndPoint(); AsyncEndPoint _endp = getEndPoint();
try try
@ -155,7 +155,7 @@ public class SelectChannelEndPointTest
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt)
{ {
FutureCallback<Void> blockingRead= new FutureCallback<>(); FutureCallback<Void> blockingRead= new FutureCallback<>();
_endp.readable(null,blockingRead); _endp.fillInterested(null,blockingRead);
blockingRead.get(); blockingRead.get();
filled=_endp.fill(_in); filled=_endp.fill(_in);
progress|=filled>0; progress|=filled>0;
@ -210,7 +210,7 @@ public class SelectChannelEndPointTest
finally finally
{ {
if (_endp.isOpen()) if (_endp.isOpen())
readInterested(); fillInterested();
} }
} }

View File

@ -139,7 +139,7 @@ public class SslConnectionTest
public void onOpen() public void onOpen()
{ {
// System.err.println("onOpen"); // System.err.println("onOpen");
readInterested(); fillInterested();
} }
@Override @Override
@ -149,7 +149,7 @@ public class SslConnectionTest
} }
@Override @Override
public synchronized void onReadable() public synchronized void onFillable()
{ {
AsyncEndPoint endp = getEndPoint(); AsyncEndPoint endp = getEndPoint();
// System.err.println("onReadable "+endp); // System.err.println("onReadable "+endp);
@ -201,7 +201,7 @@ public class SslConnectionTest
finally finally
{ {
if (endp.isOpen()) if (endp.isOpen())
readInterested(); fillInterested();
} }
} }
} }

View File

@ -380,7 +380,7 @@ public abstract class HttpChannel
if (threadName!=null) if (threadName!=null)
Thread.currentThread().setName(threadName); Thread.currentThread().setName(threadName);
if (_state.isUncompleted()) if (_state.isCompleting())
{ {
try try
{ {
@ -809,6 +809,8 @@ public abstract class HttpChannel
protected abstract void execute(Runnable task); protected abstract void execute(Runnable task);
// TODO replace with ScheduledExecutorService?
// TODO constructor inject
public abstract Timer getTimer(); public abstract Timer getTimer();

View File

@ -49,31 +49,31 @@ public class HttpChannelState implements AsyncContext, Continuation
private final static ContinuationThrowable __exception = new ContinuationThrowable(); private final static ContinuationThrowable __exception = new ContinuationThrowable();
// STATES: // STATES:
// handling() suspend() unhandle() resume() complete() doComplete() // handling() suspend() unhandle() resume() complete() doComplete()
// startAsync() dispatch() // startAsync() dispatch()
// IDLE DISPATCHED COMPLETING // IDLE DISPATCHED COMPLETECALLED
// DISPATCHED ASYNCSTARTED UNCOMPLETED // DISPATCHED ASYNCSTARTED UNCOMPLETED
// ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETING // ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETECALLED
// REDISPATCHING REDISPATCHED // REDISPATCHING REDISPATCHED
// ASYNCWAIT REDISPATCH COMPLETING // ASYNCWAIT REDISPATCH COMPLETECALLED
// REDISPATCH REDISPATCHED // REDISPATCH REDISPATCHED
// REDISPATCHED ASYNCSTARTED UNCOMPLETED // REDISPATCHED ASYNCSTARTED COMPLETING
// COMPLETING UNCOMPLETED UNCOMPLETED // COMPLETECALLED COMPLETING COMPLETING
// UNCOMPLETED UNCOMPLETED COMPLETED // COMPLETING COMPLETING COMPLETED
// COMPLETED // COMPLETED
public enum State public enum State
{ {
IDLE, // Idle request IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet DISPATCHED, // Request dispatched to filter/servlet
ASYNCSTARTED, // Suspend called, but not yet returned to container ASYNCSTARTED, // Suspend called, but not yet returned to container
REDISPATCHING,// resumed while dispatched REDISPATCHING, // resumed while dispatched
ASYNCWAIT, // Suspended and parked ASYNCWAIT, // Suspended and parked
REDISPATCH, // Has been scheduled REDISPATCH, // Has been scheduled
REDISPATCHED, // Request redispatched to filter/servlet REDISPATCHED, // Request redispatched to filter/servlet
COMPLETING, // complete while dispatched COMPLETECALLED,// complete called
UNCOMPLETED, // Request is completable COMPLETING, // Request is completable
COMPLETED // Request is complete COMPLETED // Request is complete
}; };
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -208,7 +208,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
case ASYNCSTARTED: case ASYNCSTARTED:
case REDISPATCHING: case REDISPATCHING:
case COMPLETING: case COMPLETECALLED:
case ASYNCWAIT: case ASYNCWAIT:
return true; return true;
@ -254,7 +254,7 @@ public class HttpChannelState implements AsyncContext, Continuation
case REDISPATCH: case REDISPATCH:
case REDISPATCHED: case REDISPATCHED:
case REDISPATCHING: case REDISPATCHING:
case COMPLETING: case COMPLETECALLED:
return true; return true;
default: default:
@ -312,11 +312,11 @@ public class HttpChannelState implements AsyncContext, Continuation
} }
return true; return true;
case COMPLETING: case COMPLETECALLED:
_state=State.UNCOMPLETED; _state=State.COMPLETING;
return false; return false;
case UNCOMPLETED: case COMPLETING:
case ASYNCWAIT: case ASYNCWAIT:
return false; return false;
@ -412,7 +412,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
case REDISPATCHED: case REDISPATCHED:
case DISPATCHED: case DISPATCHED:
_state=State.UNCOMPLETED; _state=State.COMPLETING;
return true; return true;
case IDLE: case IDLE:
@ -424,9 +424,9 @@ public class HttpChannelState implements AsyncContext, Continuation
scheduleTimeout(); scheduleTimeout();
if (_state==State.ASYNCWAIT) if (_state==State.ASYNCWAIT)
return true; return true;
else if (_state==State.COMPLETING) else if (_state==State.COMPLETECALLED)
{ {
_state=State.UNCOMPLETED; _state=State.COMPLETING;
return true; return true;
} }
_initial=false; _initial=false;
@ -438,9 +438,9 @@ public class HttpChannelState implements AsyncContext, Continuation
_state=State.REDISPATCHED; _state=State.REDISPATCHED;
return false; return false;
case COMPLETING: case COMPLETECALLED:
_initial=false; _initial=false;
_state=State.UNCOMPLETED; _state=State.COMPLETING;
return true; return true;
default: default:
@ -573,11 +573,11 @@ public class HttpChannelState implements AsyncContext, Continuation
case IDLE: case IDLE:
case ASYNCSTARTED: case ASYNCSTARTED:
_state=State.COMPLETING; _state=State.COMPLETECALLED;
return; return;
case ASYNCWAIT: case ASYNCWAIT:
_state=State.COMPLETING; _state=State.COMPLETECALLED;
dispatch=!_expired; dispatch=!_expired;
break; break;
@ -621,7 +621,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
switch(_state) switch(_state)
{ {
case UNCOMPLETED: case COMPLETING:
_state=State.COMPLETED; _state=State.COMPLETED;
cListeners=_continuationListeners; cListeners=_continuationListeners;
aListeners=_asyncListeners; aListeners=_asyncListeners;
@ -729,25 +729,25 @@ public class HttpChannelState implements AsyncContext, Continuation
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isCompleting() public boolean isCompleteCalled()
{
synchronized (this)
{
return _state==State.COMPLETECALLED;
}
}
/* ------------------------------------------------------------ */
boolean isCompleting()
{ {
synchronized (this) synchronized (this)
{ {
return _state==State.COMPLETING; return _state==State.COMPLETING;
} }
}
/* ------------------------------------------------------------ */
boolean isUncompleted()
{
synchronized (this)
{
return _state==State.UNCOMPLETED;
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isComplete() public boolean isCompleted()
{ {
synchronized (this) synchronized (this)
{ {
@ -785,7 +785,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
case IDLE: case IDLE:
case DISPATCHED: case DISPATCHED:
case UNCOMPLETED: case COMPLETING:
case COMPLETED: case COMPLETED:
return false; return false;

View File

@ -182,7 +182,7 @@ public class HttpConnection extends AbstractAsyncConnection
{ {
LOG.debug("Opened HTTP Connection {}",this); LOG.debug("Opened HTTP Connection {}",this);
super.onOpen(); super.onOpen();
readInterested(); fillInterested();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -207,7 +207,7 @@ public class HttpConnection extends AbstractAsyncConnection
* the HttpChannel becomes !idle; or the connection has been changed * the HttpChannel becomes !idle; or the connection has been changed
*/ */
@Override @Override
public synchronized void onReadable() public synchronized void onFillable()
{ {
LOG.debug("{} onReadable {}",this,_channel.isIdle()); LOG.debug("{} onReadable {}",this,_channel.isIdle());
@ -232,7 +232,7 @@ public class HttpConnection extends AbstractAsyncConnection
if (filled==0) if (filled==0)
{ {
// Somebody wanted to read, we didn't so schedule another attempt // Somebody wanted to read, we didn't so schedule another attempt
readInterested(); fillInterested();
releaseRequestBuffer(); releaseRequestBuffer();
return; return;
} }
@ -430,7 +430,7 @@ public class HttpConnection extends AbstractAsyncConnection
{ {
// it wants to eat more // it wants to eat more
if (_requestBuffer==null) if (_requestBuffer==null)
readInterested(); fillInterested();
else if (getConnector().isStarted()) else if (getConnector().isStarted())
{ {
LOG.debug("{} pipelined",this); LOG.debug("{} pipelined",this);
@ -440,7 +440,7 @@ public class HttpConnection extends AbstractAsyncConnection
{ {
execute(new Runnable() execute(new Runnable()
{ {
@Override public void run() {onReadable();} @Override public void run() {onFillable();}
}); });
} }
catch(RejectedExecutionException e) catch(RejectedExecutionException e)
@ -742,7 +742,7 @@ public class HttpConnection extends AbstractAsyncConnection
{ {
// Wait until we can read // Wait until we can read
FutureCallback<Void> block=new FutureCallback<>(); FutureCallback<Void> block=new FutureCallback<>();
getEndPoint().readable(null,block); getEndPoint().fillInterested(null,block);
LOG.debug("{} block readable on {}",this,block); LOG.debug("{} block readable on {}",this,block);
block.get(); block.get();

View File

@ -81,7 +81,7 @@ public class ResponseTest
}) })
{ {
@Override @Override
public void onReadable() public void onFillable()
{ {
} }
}; };

View File

@ -44,7 +44,7 @@ public interface Callback<C>
* *
* @param <C> the type of the context object * @param <C> the type of the context object
*/ */
public static class Adapter<C> implements Callback<C> public static class Empty<C> implements Callback<C>
{ {
@Override @Override
public void completed(C context) public void completed(C context)