Improved HttpChannelOverHTTP2 recycling
This commit is contained in:
parent
0f4a4cdac6
commit
7b41e78f74
|
@ -68,7 +68,7 @@ public class AbstractTest
|
|||
|
||||
protected void startServer(ServerSessionListener listener) throws Exception
|
||||
{
|
||||
prepareServer(new RawHTTP2ServerConnectionFactory(listener));
|
||||
prepareServer(new RawHTTP2ServerConnectionFactory(new HttpConfiguration(),listener));
|
||||
prepareClient();
|
||||
server.start();
|
||||
client.start();
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.eclipse.jetty.io.Connection;
|
|||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.AbstractConnectionFactory;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.util.annotation.Name;
|
||||
|
||||
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
|
||||
{
|
||||
|
@ -37,10 +39,12 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
private int maxDynamicTableSize = 4096;
|
||||
private int initialStreamWindow = FlowControl.DEFAULT_WINDOW_SIZE;
|
||||
private int maxConcurrentStreams = -1;
|
||||
private final HttpConfiguration httpConfiguration;
|
||||
|
||||
public AbstractHTTP2ServerConnectionFactory()
|
||||
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
|
||||
{
|
||||
super("h2-15","h2-14");
|
||||
this.httpConfiguration = httpConfiguration;
|
||||
}
|
||||
|
||||
public boolean isDispatchIO()
|
||||
|
@ -83,6 +87,11 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
this.maxConcurrentStreams = maxConcurrentStreams;
|
||||
}
|
||||
|
||||
public HttpConfiguration getHttpConfiguration()
|
||||
{
|
||||
return httpConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection newConnection(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
|
@ -100,7 +109,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
|
||||
Parser parser = newServerParser(connector.getByteBufferPool(), session);
|
||||
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
|
||||
endPoint, parser, session, getInputBufferSize(), isDispatchIO(), listener);
|
||||
endPoint, httpConfiguration, parser, session, getInputBufferSize(), isDispatchIO(), listener);
|
||||
|
||||
return configure(connection, connector, endPoint);
|
||||
}
|
||||
|
|
|
@ -18,25 +18,33 @@
|
|||
|
||||
package org.eclipse.jetty.http2.server;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.http2.HTTP2Connection;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.parser.Parser;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.util.ConcurrentArrayQueue;
|
||||
|
||||
class HTTP2ServerConnection extends HTTP2Connection
|
||||
{
|
||||
// TODO the recycle pool of HttpChannelOverHTTP2 should be here
|
||||
|
||||
private final ServerSessionListener listener;
|
||||
private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>();
|
||||
private final HttpConfiguration httpConfig;
|
||||
|
||||
HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
|
||||
HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
|
||||
{
|
||||
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, dispatchIO);
|
||||
this.listener = listener;
|
||||
this.httpConfig = httpConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,4 +65,42 @@ class HTTP2ServerConnection extends HTTP2Connection
|
|||
LOG.info("Failure while notifying listener " + listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
public HttpChannelOverHTTP2 newHttpChannelOverHTTP2(Connector connector, Stream stream)
|
||||
{
|
||||
HttpChannelOverHTTP2 channel = channels.poll();
|
||||
if (channel!=null)
|
||||
{
|
||||
channel.getHttp2Transport().setStream((IStream)stream);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("recycled :{}/{}",channel,this);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel = new HttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), new HttpTransportOverHTTP2(connector, httpConfig, getEndPoint(), (IStream)stream))
|
||||
{
|
||||
@Override
|
||||
public void onCompleted()
|
||||
{
|
||||
super.onCompleted();
|
||||
recycle();
|
||||
channels.add(this);
|
||||
}
|
||||
};
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("new :{}/{}",channel,this);
|
||||
}
|
||||
stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
|
||||
return channel;
|
||||
}
|
||||
|
||||
public boolean onNewStream(Connector connector, Stream stream, HeadersFrame frame)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing {} on {}", frame, stream);
|
||||
|
||||
HttpChannelOverHTTP2 channel = newHttpChannelOverHTTP2(connector,stream);
|
||||
channel.onRequest(frame);
|
||||
return frame.isEndStream() ? false : true;
|
||||
}
|
||||
}
|
|
@ -48,23 +48,21 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
|||
{
|
||||
private static final Logger LOG = Log.getLogger(HTTP2ServerConnectionFactory.class);
|
||||
|
||||
private final HttpConfiguration httpConfiguration;
|
||||
|
||||
public HTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
|
||||
{
|
||||
this.httpConfiguration = httpConfiguration;
|
||||
super(httpConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
return new HTTPServerSessionListener(connector, httpConfiguration, endPoint);
|
||||
return new HTTPServerSessionListener(connector, endPoint);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ServerParser newServerParser(ByteBufferPool byteBufferPool, ServerParser.Listener listener)
|
||||
{
|
||||
return new ServerParser(byteBufferPool, listener, getMaxDynamicTableSize(), httpConfiguration.getRequestHeaderSize());
|
||||
return new ServerParser(byteBufferPool, listener, getMaxDynamicTableSize(), getHttpConfiguration().getRequestHeaderSize());
|
||||
}
|
||||
|
||||
|
||||
|
@ -82,17 +80,11 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
|||
private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
|
||||
{
|
||||
private final Connector connector;
|
||||
private final HttpConfiguration httpConfiguration;
|
||||
private final EndPoint endPoint;
|
||||
|
||||
// TODO This pool should be on the HTTP2ServerConnection
|
||||
// TODO Evaluate if this is be best data structure to use
|
||||
private final ConcurrentLinkedQueue<HttpChannelOverHTTP2> channels = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public HTTPServerSessionListener(Connector connector, HttpConfiguration httpConfiguration, EndPoint endPoint)
|
||||
public HTTPServerSessionListener(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
this.connector = connector;
|
||||
this.httpConfiguration = httpConfiguration;
|
||||
this.endPoint = endPoint;
|
||||
}
|
||||
|
||||
|
@ -111,35 +103,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
|||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing {} on {}", frame, stream);
|
||||
|
||||
HttpChannelOverHTTP2 channel = channels.poll();
|
||||
if (channel!=null)
|
||||
{
|
||||
channel.getHttp2Transport().setStream((IStream)stream);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("recycled :{}",channel);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)stream))
|
||||
{
|
||||
@Override
|
||||
public void onCompleted()
|
||||
{
|
||||
super.onCompleted();
|
||||
recycle();
|
||||
// TODO limit size?
|
||||
channels.add(this);
|
||||
}
|
||||
};
|
||||
}
|
||||
stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
|
||||
|
||||
channel.onRequest(frame);
|
||||
|
||||
return frame.isEndStream() ? null : this;
|
||||
return ((HTTP2ServerConnection)endPoint.getConnection()).onNewStream(connector,stream,frame)?this:null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -128,10 +128,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
@Override
|
||||
public void succeeded(Stream pushStream)
|
||||
{
|
||||
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)pushStream);
|
||||
HttpChannelOverHTTP2 channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, transport);
|
||||
pushStream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
|
||||
|
||||
HTTP2ServerConnection connection = (HTTP2ServerConnection)endPoint.getConnection();
|
||||
HttpChannelOverHTTP2 channel = connection.newHttpChannelOverHTTP2(connector,pushStream);
|
||||
channel.onPushRequest(request);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,13 +23,15 @@ import org.eclipse.jetty.http2.parser.ServerParser;
|
|||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
|
||||
public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory
|
||||
{
|
||||
private final ServerSessionListener listener;
|
||||
|
||||
public RawHTTP2ServerConnectionFactory(ServerSessionListener listener)
|
||||
public RawHTTP2ServerConnectionFactory(HttpConfiguration httpConfiguration,ServerSessionListener listener)
|
||||
{
|
||||
super(httpConfiguration);
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ public class AbstractServerTest
|
|||
|
||||
protected void startServer(ServerSessionListener listener) throws Exception
|
||||
{
|
||||
prepareServer(new RawHTTP2ServerConnectionFactory(listener));
|
||||
prepareServer(new RawHTTP2ServerConnectionFactory(new HttpConfiguration(),listener));
|
||||
server.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -360,7 +360,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("unconsumed async input {}", this);
|
||||
_channel.abort();
|
||||
_channel.abort(new IOException("unconsumed input"));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue