jetty-9 miscillaneous optimizations: donot dispatch to HTTP and SPDY; improved executorCallback

This commit is contained in:
Greg Wilkins 2012-10-02 13:48:51 -07:00
parent ef2939b86c
commit 6cc0734a1a
9 changed files with 144 additions and 178 deletions

View File

@ -46,21 +46,30 @@ public abstract class AbstractConnection implements Connection
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback<Void> _readCallback;
private int _inputBufferSize=8192;
private int _inputBufferSize=2048;
public AbstractConnection(EndPoint endp, Executor executor)
{
this(endp, executor, true);
this(endp,executor,true);
}
public AbstractConnection(EndPoint endp, Executor executor, final boolean dispatchCompletion)
public AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_executor = executor;
_readCallback = new ExecutorCallback<Void>(executor)
_readCallback = new ExecutorCallback<Void>(executor,0)
{
@Override
public void completed(Void context)
{
if (executeOnfillable)
super.completed(context);
else
onCompleted(context);
}
@Override
protected void onCompleted(Void context)
{
@ -107,16 +116,10 @@ public abstract class AbstractConnection implements Connection
onFillInterestedFailed(x);
}
@Override
protected boolean alwaysDispatchCompletion()
{
return dispatchCompletion;
}
@Override
public String toString()
{
return String.format("AC.ReadCB@%x", AbstractConnection.this.hashCode());
return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
}
};
}
@ -137,11 +140,11 @@ public abstract class AbstractConnection implements Connection
_inputBufferSize = inputBufferSize;
}
public Executor getExecutor()
protected Executor getExecutor()
{
return _executor;
}
/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
@ -186,7 +189,7 @@ public abstract class AbstractConnection implements Connection
* <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
* @param cause the exception that caused the failure
*/
public void onFillInterestedFailed(Throwable cause)
protected void onFillInterestedFailed(Throwable cause)
{
LOG.debug("{} onFillInterestedFailed {}", this, cause);
if (_endPoint.isOpen())

View File

@ -29,6 +29,8 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocket;
@ -57,7 +59,18 @@ public class SslConnectionTest
private volatile boolean _testFill=true;
private volatile FutureCallback<Void> _writeCallback;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
final AtomicInteger _dispatches = new AtomicInteger();
protected QueuedThreadPool _threadPool = new QueuedThreadPool()
{
@Override
public boolean dispatch(Runnable job)
{
_dispatches.incrementAndGet();
return super.dispatch(job);
}
};
protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager()
{
@ -113,6 +126,7 @@ public class SslConnectionTest
_threadPool.start();
_scheduler.start();
_manager.start();
}
@After
@ -132,7 +146,7 @@ public class SslConnectionTest
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
super(endp, _threadPool,false);
}
@Override
@ -228,12 +242,19 @@ public class SslConnectionTest
server.configureBlocking(false);
_manager.accept(server);
client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
client.getOutputStream().write("Hello".getBytes("UTF-8"));
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals(10, len);
Assert.assertEquals("HelloWorld",new String(buffer,0,len,StringUtil.__UTF8_CHARSET));
Assert.assertEquals(5, len);
Assert.assertEquals("Hello",new String(buffer,0,len,StringUtil.__UTF8_CHARSET));
_dispatches.set(0);
client.getOutputStream().write("World".getBytes("UTF-8"));
len=5;
while(len>0)
len-=client.getInputStream().read(buffer);
Assert.assertEquals(1, _dispatches.get());
client.close();
}

View File

@ -75,7 +75,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public HttpConnection(HttpChannelConfig config, Connector connector, EndPoint endPoint)
{
super(endPoint, connector.getExecutor());
// Tell AbstractConnector executeOnFillable==false because we are guaranteeing that onfillable
// will never block nor take an excessive amount of CPU. ie it is OK for the selector thread to
// be used. In this case the thread that calls onfillable will be asked to do some IO and parsing.
super(endPoint, connector.getExecutor(),false);
_config = config;
_connector = connector;
@ -193,10 +196,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
while (true)
{
// Can the parser progress (even with an empty buffer)
boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
// If there is a request buffer, we are re-entering here
if (!event && BufferUtil.isEmpty(_requestBuffer))
if (!call_channel && BufferUtil.isEmpty(_requestBuffer))
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), false);
@ -232,11 +235,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
// Parse what we have read
event=_parser.parseNext(_requestBuffer);
call_channel=_parser.parseNext(_requestBuffer);
}
// Parse the buffer
if (event)
if (call_channel)
{
// Parse as much content as there is available before calling the channel
// this is both efficient (may queue many chunks), will correctly set available for 100 continues
@ -250,25 +253,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
_channel.run();
// Return if the channel is still processing the request
if (_channel.getState().isSuspending())
{
// release buffer if no input being held.
// This is needed here to handle the case of no request input. If there
// is request input, then the release is handled by Input@onAllContentConsumed()
if (_channel.getRequest().getHttpInput().available()==0)
releaseRequestBuffer();
return;
}
// return if the connection has been changed
if (getEndPoint().getConnection()!=this)
{
releaseRequestBuffer();
return;
}
getExecutor().execute(_channel);
return;
}
}
}
@ -457,49 +443,45 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
reset();
// Is this thread dispatched from a resume ?
if (getCurrentConnection() != HttpConnection.this)
if (_parser.isStart())
{
if (_parser.isStart())
// it wants to eat more
if (_requestBuffer == null)
{
// it wants to eat more
if (_requestBuffer == null)
{
fillInterested();
}
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined", this);
fillInterested();
}
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined", this);
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
if (_parser.isClosed() && !getEndPoint().isOutputShutdown())
else
{
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
getEndPoint().close();
}
}
if (_parser.isClosed() && !getEndPoint().isOutputShutdown())
{
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
}
// make sure that an oshut connection is driven towards close
// TODO this is a little ugly
if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
@ -537,7 +519,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// Do we have content ready to parse?
if (BufferUtil.isEmpty(_requestBuffer))
{
{
// If no more input
if (getEndPoint().isInputShutdown())
{
@ -553,7 +535,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// We will need a buffer to read into
if (_requestBuffer==null)
_requestBuffer=_bufferPool.acquire(getInputBufferSize(),false);
{
long content_length=_channel.getRequest().getContentLength();
int size=getInputBufferSize();
if (size<content_length)
size=size*4; // TODO tune this
_requestBuffer=_bufferPool.acquire(size,false);
}
// read some data
int filled=getEndPoint().fill(_requestBuffer);

View File

@ -260,13 +260,6 @@ public class Holder<T> extends AbstractLifeCycle implements Dumpable
return _asyncSupported;
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return _name;
}
/* ------------------------------------------------------------ */
protected void illegalStateIfContextStarted()
{
@ -282,7 +275,7 @@ public class Holder<T> extends AbstractLifeCycle implements Dumpable
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(_name).append("==").append(_className)
out.append(toString())
.append(" - ").append(AbstractLifeCycle.getState(this)).append("\n");
ContainerLifeCycle.dump(out,indent,_initParams.entrySet());
}
@ -294,6 +287,13 @@ public class Holder<T> extends AbstractLifeCycle implements Dumpable
return ContainerLifeCycle.dump(this);
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%x==%s",_name,hashCode(),_className);
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */

View File

@ -178,15 +178,10 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
*/
public void setInitOrder(int order)
{
_initOnStartup=true;
_initOnStartup=order>0;
_initOrder = order;
}
public boolean isSetInitOrder()
{
return _initOnStartup;
}
/* ------------------------------------------------------------ */
/** Comparitor by init order.
*/
@ -931,4 +926,12 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
throw se;
}
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%x==%s,%d,%b",_name,hashCode(),_className,_initOrder,_servlet!=null);
}
}

View File

@ -119,8 +119,6 @@ public class ServletContextHandlerTest
assertEquals(2,__testServlets.get());
assertThat(holder0.getServletInstance(),nullValue());
response =_connector.getResponses("GET /test0 HTTP/1.0\r\n\r\n");
assertThat(response,containsString("200 OK"));
@ -129,6 +127,8 @@ public class ServletContextHandlerTest
_server.stop();
assertEquals(0,__testServlets.get());
holder0.setInitOrder(0);
_server.start();
assertEquals(2,__testServlets.get());
assertThat(holder0.getServletInstance(),nullValue());

View File

@ -52,7 +52,8 @@ public class SPDYConnection extends AbstractConnection implements Controller<Sta
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor,int bufferSize)
{
super(endPoint, executor);
// TODO explain why we are passing false here
super(endPoint, executor, false);
this.bufferPool = bufferPool;
this.parser = parser;
onIdle(true);

View File

@ -18,85 +18,9 @@
package org.eclipse.jetty.spdy;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Callback;
/**
* <p>A {@link Promise} is a {@link Future} that allows a result or a failure to be set,
* so that the {@link Future} will be {@link #isDone() done}.</p>
*
* @param <T> the type of the result object
*/
public class Promise<T> implements Callback<T>, Future<T>
@Deprecated
public class Promise<T> extends FutureCallback<T>
{
private final CountDownLatch latch = new CountDownLatch(1);
private boolean cancelled;
private Throwable failure;
private T promise;
@Override
public void completed(T result)
{
this.promise = result;
latch.countDown();
}
@Override
public void failed(T context, Throwable x)
{
this.failure = x;
latch.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
cancelled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled()
{
return cancelled;
}
@Override
public boolean isDone()
{
return cancelled || latch.getCount() == 0;
}
@Override
public T get() throws InterruptedException, ExecutionException
{
latch.await();
return result();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
boolean elapsed = !latch.await(timeout, unit);
if (elapsed)
throw new TimeoutException();
return result();
}
private T result() throws ExecutionException
{
if (isCancelled())
throw new CancellationException();
Throwable failure = this.failure;
if (failure != null)
throw new ExecutionException(failure);
return promise;
}
}

View File

@ -24,6 +24,14 @@ public abstract class ExecutorCallback<C> implements Callback<C>
{
private final ForkInvoker<C> _invoker;
private final Executor _executor;
private final Runnable _onComplete=new Runnable()
{
@Override
public void run()
{
onCompleted(null);
}
};
public ExecutorCallback(Executor executor)
{
@ -33,14 +41,32 @@ public abstract class ExecutorCallback<C> implements Callback<C>
public ExecutorCallback(Executor executor, int maxRecursion)
{
_executor = executor;
_invoker = new ExecutorCallbackInvoker(maxRecursion);
_invoker = maxRecursion>0?new ExecutorCallbackInvoker(maxRecursion):null;
if (_executor==null)
throw new IllegalArgumentException();
}
@Override
public final void completed(final C context)
public void completed(final C context)
{
// Should we execute?
if (alwaysDispatchCompletion())
if (_invoker==null)
{
if (context==null)
_executor.execute(_onComplete);
else
{
_executor.execute(new Runnable()
{
@Override
public void run()
{
onCompleted(context);
}
});
}
}
else if (alwaysDispatchCompletion())
{
_invoker.fork(context);
}
@ -53,7 +79,7 @@ public abstract class ExecutorCallback<C> implements Callback<C>
protected abstract void onCompleted(C context);
@Override
public final void failed(final C context, final Throwable x)
public void failed(final C context, final Throwable x)
{
// Always execute failure
Runnable runnable = new Runnable()