jetty-9 optimisation to dispatch before parsing so that handling is done in same thread

This commit is contained in:
Greg Wilkins 2012-12-14 09:50:22 +11:00
parent 5b8a9bb95e
commit 6bfc19be1b
6 changed files with 89 additions and 102 deletions

View File

@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -39,6 +38,8 @@ import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
public static final boolean EXECUTE_ONFILLABLE=true;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
@ -46,82 +47,22 @@ public abstract class AbstractConnection implements Connection
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback _readCallback;
private final boolean _executeOnfillable;
private int _inputBufferSize=2048;
public AbstractConnection(EndPoint endp, Executor executor)
protected AbstractConnection(EndPoint endp, Executor executor)
{
this(endp,executor,true);
this(endp,executor,EXECUTE_ONFILLABLE);
}
public AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_executor = executor;
_readCallback = new ExecutorCallback(executor,0)
{
@Override
public void succeeded()
{
if (executeOnfillable)
super.succeeded();
else
onCompleted();
}
@Override
protected void onCompleted()
{
if (_state.compareAndSet(State.INTERESTED,State.FILLING))
{
try
{
onFillable();
}
finally
{
loop:while(true)
{
switch(_state.get())
{
case IDLE:
case INTERESTED:
throw new IllegalStateException();
case FILLING:
if (_state.compareAndSet(State.FILLING,State.IDLE))
break loop;
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
{
getEndPoint().fillInterested(_readCallback);
break loop;
}
break;
}
}
}
}
else
LOG.warn(new Throwable());
}
@Override
protected void onFailed(Throwable x)
{
onFillInterestedFailed(x);
}
@Override
public String toString()
{
return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
}
};
_readCallback = new ReadCallback();
_executeOnfillable=executeOnfillable;
}
@Override
@ -216,8 +157,6 @@ public abstract class AbstractConnection implements Connection
return true;
}
// TODO remove this when open/close refactored
final AtomicReference<Throwable> _opened = new AtomicReference<>(null);
@Override
public void onOpen()
{
@ -225,12 +164,6 @@ public abstract class AbstractConnection implements Connection
for (Listener listener : listeners)
listener.onOpened(this);
if (!_opened.compareAndSet(null,new Throwable()))
{
LOG.warn("ALREADY OPENED ", _opened.get());
LOG.warn("EXTRA OPEN AT ",new Throwable());
}
}
@Override
@ -294,4 +227,67 @@ public abstract class AbstractConnection implements Connection
{
IDLE, INTERESTED, FILLING, FILLING_INTERESTED
}
private class ReadCallback implements Callback, Runnable
{
@Override
public void run()
{
if (_state.compareAndSet(State.INTERESTED,State.FILLING))
{
try
{
onFillable();
}
finally
{
loop:while(true)
{
switch(_state.get())
{
case IDLE:
case INTERESTED:
throw new IllegalStateException();
case FILLING:
if (_state.compareAndSet(State.FILLING,State.IDLE))
break loop;
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
{
getEndPoint().fillInterested(_readCallback);
break loop;
}
break;
}
}
}
}
else
LOG.warn(new Throwable());
}
@Override
public void succeeded()
{
if (_executeOnfillable)
_executor.execute(this);
else
run();
}
@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}
@Override
public String toString()
{
return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
}
};
}

View File

@ -103,7 +103,9 @@ public class SslConnection extends AbstractConnection
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine)
{
super(endPoint, executor);
// This connection does not execute calls to onfillable, so they will be called by the selector thread.
// onfillable does not block and will only wakeup another thread to do the actual reading and handling.
super(endPoint, executor, !EXECUTE_ONFILLABLE);
this._bufferPool = byteBufferPool;
this._sslEngine = sslEngine;
this._decryptedEndPoint = newDecryptedEndPoint();

View File

@ -246,7 +246,7 @@ public class SslConnectionTest
len=5;
while(len>0)
len-=client.getInputStream().read(buffer);
Assert.assertEquals(1, _dispatches.get());
Assert.assertEquals(0, _dispatches.get());
client.close();
}

View File

@ -66,24 +66,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private BlockingCallback _readBlocker = new BlockingCallback();
private BlockingCallback _writeBlocker = new BlockingCallback();
// TODO get rid of this
private final Runnable _channelRunner = new Runnable()
{
@Override
public void run()
{
try
{
setCurrentConnection(HttpConnection.this);
_channel.run();
}
finally
{
setCurrentConnection(null);
}
}
};
public static HttpConnection getCurrentConnection()
{
@ -102,10 +84,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
{
// Tell AbstractConnector executeOnFillable==false because we are guaranteeing that onfillable
// will never block nor take an excessive amount of CPU. ie it is OK for the selector thread to
// be used. In this case the thread that calls onfillable will be asked to do some IO and parsing.
super(endPoint, connector.getExecutor(),false);
// Tell AbstractConnector executeOnFillable==true because we want the same thread that
// does the HTTP parsing to handle the request so its cache is hot
super(endPoint, connector.getExecutor(),true);
_config = config;
_connector = connector;
@ -280,7 +261,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
getExecutor().execute(_channelRunner);
try
{
setCurrentConnection(HttpConnection.this);
_channel.run();
}
finally
{
setCurrentConnection(null);
}
return;
}
}

View File

@ -56,7 +56,7 @@ public class SPDYConnection extends AbstractConnection implements Controller, Id
// always dispatches to a new thread when calling application
// code, so here we can safely pass false as last parameter,
// and avoid to dispatch to onFillable().
super(endPoint, executor, false);
super(endPoint, executor, !EXECUTE_ONFILLABLE);
this.bufferPool = bufferPool;
this.parser = parser;
onIdle(true);

View File

@ -318,7 +318,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor);
super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specially with MUX
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);