453487 Recycle HttpChannelOverHTTP2

This is an initial recyling of the channel.   The pool is a concurrent linked list, but it may be better to use an array backed list/stack that can easily enforce a maximum pool size.
The pool is currently on the session listener, when it really should be on the HTTP2ServerConnection, but more refactoring is needed for that.
Also the pool is currently inaccessible to the push mechanism
This commit is contained in:
Greg Wilkins 2014-11-28 12:06:14 +11:00
parent 007b7dac1c
commit 0a144ed3ac
5 changed files with 133 additions and 53 deletions

View File

@ -18,12 +18,9 @@
package org.eclipse.jetty.http2.server;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
@ -111,34 +108,4 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);
protected abstract ServerParser newServerParser(ByteBufferPool byteBufferPool, ServerParser.Listener listener);
private static class HTTP2ServerConnection extends HTTP2Connection
{
private final ServerSessionListener listener;
private HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, dispatchIO);
this.listener = listener;
}
@Override
public void onOpen()
{
super.onOpen();
notifyConnect(getSession());
}
private void notifyConnect(ISession session)
{
try
{
listener.onAccept(session);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
}
}

View File

@ -0,0 +1,60 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
class HTTP2ServerConnection extends HTTP2Connection
{
// TODO the recycle pool of HttpChannelOverHTTP2 should be here
private final ServerSessionListener listener;
HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, dispatchIO);
this.listener = listener;
}
@Override
public void onOpen()
{
super.onOpen();
notifyConnect(getSession());
}
private void notifyConnect(ISession session)
{
try
{
listener.onAccept(session);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
@ -64,12 +65,17 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return new ServerParser(byteBufferPool, listener, getMaxHeaderTableSize(), httpConfiguration.getRequestHeaderSize());
}
private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
{
private final Connector connector;
private final HttpConfiguration httpConfiguration;
private final EndPoint endPoint;
// TODO This pool should be on the HTTP2ServerConnection
// TODO Evaluate if this is be best data structure to use
private final ConcurrentLinkedQueue<HttpChannelOverHTTP2> channels = new ConcurrentLinkedQueue<>();
public HTTPServerSessionListener(Connector connector, HttpConfiguration httpConfiguration, EndPoint endPoint)
{
this.connector = connector;
@ -95,11 +101,27 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
MetaData.Request request = (MetaData.Request)frame.getMetaData();
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)stream, request);
HttpInputOverHTTP2 input = new HttpInputOverHTTP2();
// TODO pool HttpChannels per connection - maybe associate with thread?
HttpChannelOverHTTP2 channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, transport, input, stream);
HttpChannelOverHTTP2 channel = channels.poll();
if (channel!=null)
{
channel.getHttp2Transport().setStream((IStream)stream);
if (LOG.isDebugEnabled())
LOG.debug("recycled :{}",channel);
}
else
{
channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)stream))
{
@Override
public void onCompleted()
{
super.onCompleted();
recycle();
// TODO limit size?
channels.add(this);
}
};
}
stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
channel.onRequest(frame);
@ -143,5 +165,6 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
final Session session = stream.getSession();
session.close(ErrorCodes.PROTOCOL_ERROR, reason, Callback.Adapter.INSTANCE);
}
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@ -37,7 +38,6 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -48,13 +48,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION=new PreEncodedHttpField(HttpHeader.SERVER,HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY=new PreEncodedHttpField(HttpHeader.X_POWERED_BY,HttpConfiguration.SERVER_VERSION);
private final Stream stream; // TODO recycle channel for new Stream?
private boolean _expect100Continue = false;
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput input, Stream stream)
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
super(connector, configuration, endPoint, transport, input);
this.stream = stream;
super(connector, configuration, endPoint, transport, new HttpInputOverHTTP2());
}
@Override
@ -83,6 +81,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
LOG.debug("HTTP2 Request #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(),
System.lineSeparator(), fields);
@ -98,6 +97,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
LOG.debug("HTTP2 PUSH Request #{}/{}:{}{} {} {}{}{}",
stream.getId(),Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(),
System.lineSeparator(), request.getFields());
@ -106,12 +106,25 @@ public class HttpChannelOverHTTP2 extends HttpChannel
execute(this);
}
public HttpTransportOverHTTP2 getHttp2Transport()
{
return (HttpTransportOverHTTP2)getHttpTransport();
}
@Override
public void recycle()
{
super.recycle();
getHttp2Transport().recycle();
}
@Override
protected void commit(MetaData.Response info)
{
super.commit(info);
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
LOG.debug("HTTP2 Commit Response #{}/{}:{}{} {} {}{}{}",
stream.getId(),Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
@ -128,8 +141,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel
copy.put(original).flip();
if (LOG.isDebugEnabled())
{
Stream stream=getHttp2Transport().getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", stream.getId(),Integer.toHexString(stream.getSession().hashCode()), copy.remaining());
}
onContent(new HttpInput.Content(copy)
{
@Override

View File

@ -21,8 +21,6 @@ package org.eclipse.jetty.http2.server;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
@ -51,18 +49,22 @@ public class HttpTransportOverHTTP2 implements HttpTransport
private final Connector connector;
private final HttpConfiguration httpConfiguration;
private final EndPoint endPoint;
private final IStream stream;
private final MetaData.Request request;
private IStream stream;
public HttpTransportOverHTTP2(Connector connector, HttpConfiguration httpConfiguration, EndPoint endPoint, IStream stream, MetaData.Request request)
public HttpTransportOverHTTP2(Connector connector, HttpConfiguration httpConfiguration, EndPoint endPoint, IStream stream)
{
this.connector = connector;
this.httpConfiguration = httpConfiguration;
this.endPoint = endPoint;
this.stream = stream;
this.request = request;
}
public void recycle()
{
this.stream=null;
commit.set(false);
}
@Override
public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
{
@ -126,9 +128,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override
public void succeeded(Stream pushStream)
{
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)pushStream, request);
HttpInputOverHTTP2 input = new HttpInputOverHTTP2();
HttpChannelOverHTTP2 channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, transport, input, pushStream);
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)pushStream);
HttpChannelOverHTTP2 channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, transport);
pushStream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
channel.onPushRequest(request);
@ -198,4 +199,17 @@ public class HttpTransportOverHTTP2 implements HttpTransport
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
}
}
public void setStream(IStream stream)
{
if (LOG.isDebugEnabled())
LOG.debug("{} setStream {}",this, stream.getId());
this.stream=stream;
}
public Stream getStream()
{
return stream;
}
}