refactored jetty-io to not have onFillable statemachine. work in progres...

This commit is contained in:
Greg Wilkins 2014-12-18 14:10:06 +01:00
parent 97af3632a1
commit 31e06b5791
4 changed files with 41 additions and 351 deletions

View File

@ -42,7 +42,6 @@ public abstract class AbstractConnection implements Connection
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
private final long _created=System.currentTimeMillis();
private final EndPoint _endPoint;
private final Executor _executor;
@ -58,7 +57,6 @@ public abstract class AbstractConnection implements Connection
_executor = executor;
_readCallback = new ReadCallback();
_dispatchIO = dispatchIO;
_state.set(IDLE);
}
@Override
@ -125,30 +123,7 @@ public abstract class AbstractConnection implements Connection
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested {}",this);
while(true)
{
State state=_state.get();
if (next(state,state.fillInterested()))
break;
}
}
public void fillInterested(Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested {}",this);
while(true)
{
State state=_state.get();
// TODO yuck
if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
break;
State next=new FillingInterestedCallback(callback,state);
if (next(state,next))
break;
}
getEndPoint().fillInterested(_readCallback);
}
/**
@ -175,12 +150,12 @@ public abstract class AbstractConnection implements Connection
if (_endPoint.isOutputShutdown())
_endPoint.close();
else
{
_endPoint.shutdownOutput();
fillInterested();
}
}
}
if (_endPoint.isOpen())
fillInterested();
}
}
/**
@ -257,330 +232,21 @@ public abstract class AbstractConnection implements Connection
@Override
public String toString()
{
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
public boolean next(State state, State next)
{
if (next==null)
return true;
if(_state.compareAndSet(state,next))
{
if (LOG.isDebugEnabled())
LOG.debug("{}-->{} {}",state,next,this);
if (next!=state)
next.onEnter(AbstractConnection.this);
return true;
}
return false;
}
private static final class IdleState extends State
{
private IdleState()
{
super("IDLE");
}
@Override
State fillInterested()
{
return FILL_INTERESTED;
}
}
private static final class FillInterestedState extends State
{
private FillInterestedState()
{
super("FILL_INTERESTED");
}
@Override
public void onEnter(AbstractConnection connection)
{
connection.getEndPoint().fillInterested(connection._readCallback);
}
@Override
State fillInterested()
{
return this;
}
@Override
public State onFillable()
{
return FILLING;
}
@Override
State onFailed()
{
return IDLE;
}
}
private static final class RefillingState extends State
{
private RefillingState()
{
super("REFILLING");
}
@Override
State fillInterested()
{
return FILLING_FILL_INTERESTED;
}
@Override
public State onFilled()
{
return IDLE;
}
}
private static final class FillingFillInterestedState extends State
{
private FillingFillInterestedState(String name)
{
super(name);
}
@Override
State fillInterested()
{
return this;
}
State onFilled()
{
return FILL_INTERESTED;
}
}
private static final class FillingState extends State
{
private FillingState()
{
super("FILLING");
}
@Override
public void onEnter(AbstractConnection connection)
{
if (connection.isDispatchIO())
connection.getExecutor().execute(connection._runOnFillable);
else
connection._runOnFillable.run();
}
@Override
State fillInterested()
{
return FILLING_FILL_INTERESTED;
}
@Override
public State onFilled()
{
return IDLE;
}
}
public static class State
{
private final String _name;
State(String name)
{
_name=name;
}
@Override
public String toString()
{
return _name;
}
void onEnter(AbstractConnection connection)
{
}
State fillInterested()
{
throw new IllegalStateException(this.toString());
}
State onFillable()
{
throw new IllegalStateException(this.toString());
}
State onFilled()
{
throw new IllegalStateException(this.toString());
}
State onFailed()
{
throw new IllegalStateException(this.toString());
}
}
public static final State IDLE=new IdleState();
public static final State FILL_INTERESTED=new FillInterestedState();
public static final State FILLING=new FillingState();
public static final State REFILLING=new RefillingState();
public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
public class NestedState extends State
{
private final State _nested;
NestedState(State nested)
{
super("NESTED("+nested+")");
_nested=nested;
}
NestedState(String name,State nested)
{
super(name+"("+nested+")");
_nested=nested;
}
@Override
State fillInterested()
{
return new NestedState(_nested.fillInterested());
}
@Override
State onFillable()
{
return new NestedState(_nested.onFillable());
}
@Override
State onFilled()
{
return new NestedState(_nested.onFilled());
}
}
public class FillingInterestedCallback extends NestedState
{
private final Callback _callback;
FillingInterestedCallback(Callback callback,State nested)
{
super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
_callback=callback;
}
@Override
void onEnter(final AbstractConnection connection)
{
Callback callback=new Callback()
{
@Override
public void succeeded()
{
while(true)
{
State state = connection._state.get();
if (!(state instanceof NestedState))
break;
State nested=((NestedState)state)._nested;
if (connection.next(state,nested))
break;
}
_callback.succeeded();
}
@Override
public void failed(Throwable x)
{
while(true)
{
State state = connection._state.get();
if (!(state instanceof NestedState))
break;
State nested=((NestedState)state)._nested;
if (connection.next(state,nested))
break;
}
_callback.failed(x);
}
};
connection.getEndPoint().fillInterested(callback);
}
}
private final Runnable _runOnFillable = new Runnable()
{
@Override
public void run()
{
try
{
onFillable();
}
finally
{
while(true)
{
State state=_state.get();
if (next(state,state.onFilled()))
break;
}
}
}
};
private class ReadCallback implements Callback
{
@Override
public void succeeded()
{
while(true)
{
State state=_state.get();
if (next(state,state.onFillable()))
break;
}
onFillable();
}
@Override
public void failed(final Throwable x)
{
_executor.execute(new Runnable()
{
@Override
public void run()
{
while(true)
{
State state=_state.get();
if (next(state,state.onFailed()))
break;
}
onFillInterestedFailed(x);
}
});
onFillInterestedFailed(x);
}
@Override

View File

@ -869,7 +869,7 @@ public class SslConnection extends AbstractConnection
{
_sslEngine.closeOutbound();
flush(BufferUtil.EMPTY_BUFFER); // Send close handshake
SslConnection.this.fillInterested(); // seek reply FIN or RST or close handshake
// TODO SslConnection.this.fillInterested(); // seek reply FIN or RST or close handshake
}
catch (Exception e)
{
@ -888,10 +888,10 @@ public class SslConnection extends AbstractConnection
@Override
public void close()
{
super.close();
// First send the TLS Close Alert, then the FIN
shutdownOutput();
getEndPoint().close();
super.close();
}
@Override

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -116,6 +117,7 @@ public class SelectChannelEndPointTest
public class TestConnection extends AbstractConnection
{
volatile FutureCallback _blockingRead;
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last = -1;
@ -133,8 +135,29 @@ public class SelectChannelEndPointTest
}
@Override
public synchronized void onFillable()
public void onFillInterestedFailed(Throwable cause)
{
Callback blocking = _blockingRead;
if (blocking!=null)
{
_blockingRead=null;
blocking.failed(cause);
return;
}
super.onFillInterestedFailed(cause);
}
@Override
public void onFillable()
{
Callback blocking = _blockingRead;
if (blocking!=null)
{
_blockingRead=null;
blocking.succeeded();
return;
}
EndPoint _endp = getEndPoint();
try
{
@ -155,9 +178,9 @@ public class SelectChannelEndPointTest
// If the tests wants to block, then block
while (_blockAt > 0 && _endp.isOpen() && _in.remaining() < _blockAt)
{
FutureCallback blockingRead = new FutureCallback();
fillInterested(blockingRead);
blockingRead.get();
FutureCallback future = _blockingRead = new FutureCallback();
fillInterested();
future.get();
filled = _endp.fill(_in);
progress |= filled > 0;
}
@ -184,6 +207,9 @@ public class SelectChannelEndPointTest
if (_endp.isInputShutdown())
_endp.shutdownOutput();
}
if (_endp.isOpen())
fillInterested();
}
catch (ExecutionException e)
{
@ -210,8 +236,6 @@ public class SelectChannelEndPointTest
}
finally
{
if (_endp.isOpen())
fillInterested();
}
}
}

View File

@ -113,7 +113,7 @@ public class TimerScheduler extends AbstractLifeCycle implements Scheduler, Runn
}
catch (Throwable x)
{
LOG.debug("Exception while executing task " + _task, x);
LOG.warn("Exception while executing task " + _task, x);
}
}