Implemented dispatch to the application using an ExecutionStrategy.

Now instead of having the channel to dispatch when it detects that it
has to call the application (upon receiving a HEADERS frame, or upon
a push "fake" request), now the whole mechanism is controlled by an
ExecutionStrategy.
This commit is contained in:
Simone Bordet 2014-12-18 15:12:18 +01:00
parent 7f98b64658
commit e35c51eb7d
6 changed files with 259 additions and 194 deletions

View File

@ -20,23 +20,29 @@ package org.eclipse.jetty.http2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2Connection extends AbstractConnection
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>();
private final ByteBufferPool byteBufferPool;
private final Parser parser;
private final ISession session;
private final int bufferSize;
private final ExecutionStrategy executionStrategy; // TODO: make it pluggable from outside
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, boolean dispatchIO)
{
@ -45,6 +51,7 @@ public class HTTP2Connection extends AbstractConnection
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.executionStrategy = new ExecutionStrategy.Iterative(new HTTP2Producer(), executor);
}
protected ISession getSession()
@ -72,33 +79,7 @@ public class HTTP2Connection extends AbstractConnection
@Override
public void onFillable()
{
ByteBuffer buffer = byteBufferPool.acquire(bufferSize, false);
boolean readMore = read(buffer) == 0;
byteBufferPool.release(buffer);
if (readMore)
fillInterested();
}
protected int read(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
while (true)
{
int filled = fill(endPoint, buffer);
if (filled == 0)
{
return 0;
}
else if (filled < 0)
{
session.onShutdown();
return -1;
}
else
{
parser.parse(buffer);
}
}
executionStrategy.produce();
}
private int fill(EndPoint endPoint, ByteBuffer buffer)
@ -124,4 +105,82 @@ public class HTTP2Connection extends AbstractConnection
session.onIdleTimeout();
return false;
}
protected void offerTask(Runnable task)
{
tasks.offer(task);
}
private class HTTP2Producer implements ExecutionStrategy.Producer
{
private ByteBuffer buffer;
@Override
public Runnable produce()
{
Runnable task = tasks.poll();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued task {}", task);
if (task != null)
return task;
boolean looping = false;
while (true)
{
if (buffer == null)
buffer = byteBufferPool.acquire(bufferSize, false);
if (looping)
{
while (buffer.hasRemaining())
{
if (parser.parse(buffer))
break;
}
task = tasks.poll();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued task {}", task);
if (task != null)
{
release();
return task;
}
}
int filled = fill(getEndPoint(), buffer);
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes", filled);
if (filled == 0)
{
fillInterested();
release();
return null;
}
else if (filled < 0)
{
session.onShutdown();
release();
return null;
}
looping = true;
}
}
private void release()
{
if (BufferUtil.isEmpty(buffer))
{
byteBufferPool.release(buffer);
buffer = null;
}
}
@Override
public void onProductionComplete()
{
}
}
}

View File

@ -21,10 +21,10 @@ package org.eclipse.jetty.http2.server;
import java.util.Queue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.parser.Parser;
@ -34,13 +34,13 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
class HTTP2ServerConnection extends HTTP2Connection
public class HTTP2ServerConnection extends HTTP2Connection
{
private final ServerSessionListener listener;
private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>();
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, dispatchIO);
this.listener = listener;
@ -51,10 +51,10 @@ class HTTP2ServerConnection extends HTTP2Connection
public void onOpen()
{
super.onOpen();
notifyConnect(getSession());
notifyAccept(getSession());
}
private void notifyConnect(ISession session)
private void notifyAccept(ISession session)
{
try
{
@ -66,41 +66,51 @@ class HTTP2ServerConnection extends HTTP2Connection
}
}
public HttpChannelOverHTTP2 newHttpChannelOverHTTP2(Connector connector, Stream stream)
public void onNewStream(Connector connector, IStream stream, HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onRequest(frame);
offerTask(task);
}
public void onPush(Connector connector, IStream stream, MetaData.Request request)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing push {} on {}", request, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onPushRequest(request);
offerTask(task);
}
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
{
HttpChannelOverHTTP2 channel = channels.poll();
if (channel!=null)
if (channel != null)
{
channel.getHttp2Transport().setStream((IStream)stream);
channel.getHttpTransport().setStream(stream);
if (LOG.isDebugEnabled())
LOG.debug("recycled :{}/{}",channel,this);
LOG.debug("Recycling channel {} for {}", channel, this);
}
else
{
channel = new HttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), new HttpTransportOverHTTP2(connector, httpConfig, getEndPoint(), (IStream)stream))
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this);
transport.setStream(stream);
channel = new HttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), transport)
{
@Override
public void onCompleted()
{
super.onCompleted();
recycle();
channels.add(this);
channels.offer(this);
}
};
if (LOG.isDebugEnabled())
LOG.debug("new :{}/{}",channel,this);
LOG.debug("Creating channel {} for {}", channel, this);
}
stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
return channel;
}
public boolean onNewStream(Connector connector, Stream stream, HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = newHttpChannelOverHTTP2(connector,stream);
channel.onRequest(frame);
return frame.isEndStream() ? false : true;
}
}

View File

@ -20,9 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.IStream;
@ -65,7 +63,6 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return new ServerParser(byteBufferPool, listener, getMaxDynamicTableSize(), getHttpConfiguration().getRequestHeaderSize());
}
@Override
public boolean isAcceptable(String protocol, String tlsProtocol, String tlsCipher)
{
@ -76,7 +73,6 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return true;
}
private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
{
private final Connector connector;
@ -103,7 +99,8 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return ((HTTP2ServerConnection)endPoint.getConnection()).onNewStream(connector,stream,frame)?this:null;
((HTTP2ServerConnection)endPoint.getConnection()).onNewStream(connector, (IStream)stream, frame);
return frame.isEndStream() ? null : this;
}
@Override

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.http2.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@ -29,6 +28,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -46,8 +46,8 @@ import org.eclipse.jetty.util.log.Logger;
public class HttpChannelOverHTTP2 extends HttpChannel
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION=new PreEncodedHttpField(HttpHeader.SERVER,HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY=new PreEncodedHttpField(HttpHeader.X_POWERED_BY,HttpConfiguration.SERVER_VERSION);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
private boolean _expect100Continue = false;
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
@ -55,20 +55,25 @@ public class HttpChannelOverHTTP2 extends HttpChannel
super(connector, configuration, endPoint, transport, new HttpInputOverHTTP2());
}
private IStream getStream()
{
return getHttpTransport().getStream();
}
@Override
public boolean isExpecting100Continue()
{
return _expect100Continue;
}
public void onRequest(HeadersFrame frame)
public Runnable onRequest(HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
HttpFields fields = request.getFields();
_expect100Continue = fields.contains(HttpHeader.EXPECT,HttpHeaderValue.CONTINUE.asString());
_expect100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
HttpFields response=getResponse().getHttpFields();
HttpFields response = getResponse().getHttpFields();
if (getHttpConfiguration().getSendServerVersion())
response.add(SERVER_VERSION);
if (getHttpConfiguration().getSendXPoweredBy())
@ -81,41 +86,44 @@ public class HttpChannelOverHTTP2 extends HttpChannel
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(),
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
request.getMethod(), request.getURI(), request.getVersion(),
System.lineSeparator(), fields);
}
execute(this);
return this;
}
public void onPushRequest(MetaData.Request request)
public Runnable onPushRequest(MetaData.Request request)
{
onRequest(request);
getRequest().setAttribute("org.eclipse.jetty.pushed",Boolean.TRUE);
getRequest().setAttribute("org.eclipse.jetty.pushed", Boolean.TRUE);
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
Stream stream = getStream();
LOG.debug("HTTP2 PUSH Request #{}/{}:{}{} {} {}{}{}",
stream.getId(),Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(),
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
request.getMethod(), request.getURI(), request.getVersion(),
System.lineSeparator(), request.getFields());
}
execute(this);
return this;
}
public HttpTransportOverHTTP2 getHttp2Transport()
@Override
public HttpTransportOverHTTP2 getHttpTransport()
{
return (HttpTransportOverHTTP2)getHttpTransport();
return (HttpTransportOverHTTP2)super.getHttpTransport();
}
@Override
public void recycle()
{
super.recycle();
getHttp2Transport().recycle();
getHttpTransport().recycle();
}
@Override
@ -124,9 +132,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel
super.commit(info);
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
Stream stream = getStream();
LOG.debug("HTTP2 Commit Response #{}/{}:{}{} {} {}{}{}",
stream.getId(),Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getVersion(), info.getStatus(), info.getReason(),
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
}
}
@ -142,8 +150,8 @@ public class HttpChannelOverHTTP2 extends HttpChannel
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", stream.getId(),Integer.toHexString(stream.getSession().hashCode()), copy.remaining());
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), copy.remaining());
}
onContent(new HttpInput.Content(copy)

View File

@ -30,9 +30,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -47,29 +45,36 @@ public class HttpTransportOverHTTP2 implements HttpTransport
private final AtomicBoolean commit = new AtomicBoolean();
private final Callback commitCallback = new CommitCallback();
private final Connector connector;
private final HttpConfiguration httpConfiguration;
private final EndPoint endPoint;
private final HTTP2ServerConnection connection;
private IStream stream;
public HttpTransportOverHTTP2(Connector connector, HttpConfiguration httpConfiguration, EndPoint endPoint, IStream stream)
public HttpTransportOverHTTP2(Connector connector, HTTP2ServerConnection connection)
{
this.connector = connector;
this.httpConfiguration = httpConfiguration;
this.endPoint = endPoint;
this.connection = connection;
}
public IStream getStream()
{
return stream;
}
public void setStream(IStream stream)
{
if (LOG.isDebugEnabled())
LOG.debug("{} setStream {}", this, stream.getId());
this.stream = stream;
}
public void recycle()
{
this.stream=null;
this.stream = null;
commit.set(false);
}
@Override
public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
public void send(MetaData.Response info, boolean isHeadRequest, ByteBuffer content, boolean lastContent, Callback callback)
{
boolean isHeadRequest = head;
// info != null | content != 0 | last = true => commit + send/end
// info != null | content != 0 | last = false => commit + send
// info != null | content == 0 | last = true => commit/end
@ -134,9 +139,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override
public void succeeded(Stream pushStream)
{
HTTP2ServerConnection connection = (HTTP2ServerConnection)endPoint.getConnection();
HttpChannelOverHTTP2 channel = connection.newHttpChannelOverHTTP2(connector,pushStream);
channel.onPushRequest(request);
connection.onPush(connector, (IStream)pushStream, request);
}
@Override
@ -172,7 +175,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.data(frame, callback);
}
@Override
public void completed()
{
@ -203,17 +205,4 @@ public class HttpTransportOverHTTP2 implements HttpTransport
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
}
}
public void setStream(IStream stream)
{
if (LOG.isDebugEnabled())
LOG.debug("{} setStream {}",this, stream.getId());
this.stream=stream;
}
public Stream getStream()
{
return stream;
}
}

View File

@ -16,15 +16,13 @@
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
/** Strategies to execute Producers
/**
* Strategies to execute Producers
*/
public abstract class ExecutionStrategy
{
@ -32,6 +30,7 @@ public abstract class ExecutionStrategy
{
/**
* Produce a task to run
*
* @return A task to run or null if we are complete.
*/
Runnable produce();
@ -47,21 +46,21 @@ public abstract class ExecutionStrategy
protected ExecutionStrategy(Producer producer, Executor executor)
{
_producer=producer;
_executor=executor;
_producer = producer;
_executor = executor;
}
public abstract void produce();
/* ------------------------------------------------------------ */
/** Simple iterative strategy.
/**
* Simple iterative strategy.
* Iterate over production until complete and execute each task.
*/
public static class Iterative extends ExecutionStrategy
{
public Iterative(Producer producer, Executor executor)
{
super(producer,executor);
super(producer, executor);
}
public void produce()
@ -72,9 +71,9 @@ public abstract class ExecutionStrategy
while (true)
{
// produce a task
Runnable task=_producer.produce();
Runnable task = _producer.produce();
if (task==null)
if (task == null)
break;
// execute the task
@ -88,11 +87,10 @@ public abstract class ExecutionStrategy
}
}
/* ------------------------------------------------------------ */
/**
* A Strategy that allows threads to run the tasks that they have produced,
* so execution is done with a hot cache (ie threads eat what they kill).
* <p>
* <p/>
* The phrase 'eat what you kill' comes from the hunting ethic that says a person
* shouldnt kill anything he or she doesnt plan on eating. It was taken up in its
* more general sense by lawyers, who used it to mean that an individuals earnings
@ -107,9 +105,32 @@ public abstract class ExecutionStrategy
*/
public static class EatWhatYouKill extends ExecutionStrategy implements Runnable
{
private enum State {IDLE,PRODUCING,PENDING,PRODUCING_PENDING};
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
public EatWhatYouKill(Producer producer, Executor executor)
{
super(producer, executor);
}
public void produce()
{
while (true)
{
State state = _state.get();
switch (state)
{
case IDLE:
if (!_state.compareAndSet(state, State.PENDING))
continue;
run();
return;
default:
return;
}
}
}
@Override
public void run()
{
@ -121,10 +142,10 @@ public abstract class ExecutionStrategy
while (true)
{
// If we got here, then we are the thread that is producing
Runnable task=_producer.produce();
Runnable task = _producer.produce();
// If no task was produced
if (task==null)
if (task == null)
{
// If we are the thread that sets idle
if (tryIdle())
@ -149,13 +170,13 @@ public abstract class ExecutionStrategy
private boolean tryProducing()
{
while(true)
while (true)
{
State state=_state.get();
switch(state)
State state = _state.get();
switch (state)
{
case PENDING:
if (!_state.compareAndSet(state,State.PRODUCING_PENDING))
if (!_state.compareAndSet(state, State.PRODUCING_PENDING))
continue;
return true;
@ -167,17 +188,17 @@ public abstract class ExecutionStrategy
private boolean clearProducingTryPending()
{
while(true)
while (true)
{
State state=_state.get();
switch(state)
State state = _state.get();
switch (state)
{
case PRODUCING:
if (!_state.compareAndSet(state,State.PENDING))
if (!_state.compareAndSet(state, State.PENDING))
continue;
return true;
case PRODUCING_PENDING:
if (!_state.compareAndSet(state,State.PENDING))
if (!_state.compareAndSet(state, State.PENDING))
continue;
return false;
default:
@ -188,21 +209,21 @@ public abstract class ExecutionStrategy
private boolean clearPendingTryProducing()
{
while(true)
while (true)
{
State state=_state.get();
switch(state)
State state = _state.get();
switch (state)
{
case IDLE:
return false;
case PENDING:
if (!_state.compareAndSet(state,State.PRODUCING))
if (!_state.compareAndSet(state, State.PRODUCING))
continue;
return true;
case PRODUCING_PENDING:
if (!_state.compareAndSet(state,State.PRODUCING))
if (!_state.compareAndSet(state, State.PRODUCING))
continue;
return false; // Another thread is already producing
@ -214,14 +235,14 @@ public abstract class ExecutionStrategy
private boolean tryIdle()
{
while(true)
while (true)
{
State state=_state.get();
switch(state)
State state = _state.get();
switch (state)
{
case PRODUCING:
case PRODUCING_PENDING:
if (!_state.compareAndSet(state,State.IDLE))
if (!_state.compareAndSet(state, State.IDLE))
continue;
return true;
default:
@ -230,28 +251,9 @@ public abstract class ExecutionStrategy
}
}
public EatWhatYouKill(Producer producer, Executor executor)
private enum State
{
super(producer,executor);
}
public void produce()
{
while(true)
{
State state=_state.get();
switch(state)
{
case IDLE:
if (!_state.compareAndSet(state,State.PENDING))
continue;
run();
return;
default:
return;
}
}
IDLE, PRODUCING, PENDING, PRODUCING_PENDING
}
}
}