484876 - Make simpler to customize the FlowControlStrategy.
Introduced FlowControlStrategy.Factory along with getters and setters in relevant classes.
This commit is contained in:
parent
30557429cf
commit
30694b675b
|
@ -29,6 +29,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
|
||||
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
@ -124,6 +125,7 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
|
||||
private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
|
||||
private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
|
||||
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
|
@ -212,6 +214,16 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
this.connectionFactory = connectionFactory;
|
||||
}
|
||||
|
||||
public FlowControlStrategy.Factory getFlowControlStrategyFactory()
|
||||
{
|
||||
return flowControlStrategyFactory;
|
||||
}
|
||||
|
||||
public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowControlStrategyFactory)
|
||||
{
|
||||
this.flowControlStrategyFactory = flowControlStrategyFactory;
|
||||
}
|
||||
|
||||
@ManagedAttribute("The number of selectors")
|
||||
public int getSelectors()
|
||||
{
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.Collections;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Connection;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
|
@ -67,6 +66,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
|
||||
Generator generator = new Generator(byteBufferPool);
|
||||
FlowControlStrategy flowControl = newFlowControlStrategy();
|
||||
if (flowControl == null)
|
||||
flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
|
||||
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
|
||||
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
|
||||
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener);
|
||||
|
@ -74,9 +75,13 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link HTTP2Client#setFlowControlStrategyFactory(FlowControlStrategy.Factory)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
return new BufferingFlowControlStrategy(0.5F);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Queue;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.eclipse.jetty.http.HostPortHttpField;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
|
@ -67,19 +66,14 @@ public class FlowControlStalledTest
|
|||
protected HTTP2Client client;
|
||||
protected Server server;
|
||||
|
||||
protected void start(Supplier<FlowControlStrategy> flowControlFactory, ServerSessionListener listener) throws Exception
|
||||
protected void start(FlowControlStrategy.Factory flowControlFactory, ServerSessionListener listener) throws Exception
|
||||
{
|
||||
QueuedThreadPool serverExecutor = new QueuedThreadPool();
|
||||
serverExecutor.setName("server");
|
||||
server = new Server(serverExecutor);
|
||||
connector = new ServerConnector(server, new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener)
|
||||
{
|
||||
@Override
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
return flowControlFactory.get();
|
||||
}
|
||||
});
|
||||
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
|
||||
connectionFactory.setFlowControlStrategyFactory(flowControlFactory);
|
||||
connector = new ServerConnector(server, connectionFactory);
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
|
||||
|
@ -87,14 +81,7 @@ public class FlowControlStalledTest
|
|||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||
clientExecutor.setName("client");
|
||||
client.setExecutor(clientExecutor);
|
||||
client.setClientConnectionFactory(new HTTP2ClientConnectionFactory()
|
||||
{
|
||||
@Override
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
return flowControlFactory.get();
|
||||
}
|
||||
});
|
||||
client.setFlowControlStrategyFactory(flowControlFactory);
|
||||
client.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -82,14 +82,9 @@ public abstract class FlowControlStrategyTest
|
|||
QueuedThreadPool serverExecutor = new QueuedThreadPool();
|
||||
serverExecutor.setName("server");
|
||||
server = new Server(serverExecutor);
|
||||
connector = new ServerConnector(server, new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener)
|
||||
{
|
||||
@Override
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
return FlowControlStrategyTest.this.newFlowControlStrategy();
|
||||
}
|
||||
});
|
||||
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
|
||||
connectionFactory.setFlowControlStrategyFactory(FlowControlStrategyTest.this::newFlowControlStrategy);
|
||||
connector = new ServerConnector(server, connectionFactory);
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
|
||||
|
@ -97,14 +92,7 @@ public abstract class FlowControlStrategyTest
|
|||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||
clientExecutor.setName("client");
|
||||
client.setExecutor(clientExecutor);
|
||||
client.setClientConnectionFactory(new HTTP2ClientConnectionFactory()
|
||||
{
|
||||
@Override
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
return FlowControlStrategyTest.this.newFlowControlStrategy();
|
||||
}
|
||||
});
|
||||
client.setFlowControlStrategyFactory(FlowControlStrategyTest.this::newFlowControlStrategy);
|
||||
client.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -41,4 +41,9 @@ public interface FlowControlStrategy
|
|||
public void onDataSending(IStream stream, int length);
|
||||
|
||||
public void onDataSent(IStream stream, int length);
|
||||
|
||||
public interface Factory
|
||||
{
|
||||
public FlowControlStrategy newFlowControlStrategy();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
private int initialStreamSendWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
|
||||
private int maxConcurrentStreams = -1;
|
||||
private int maxHeaderBlockFragment = 0;
|
||||
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
|
||||
|
||||
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
|
||||
{
|
||||
|
@ -102,6 +103,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
this.maxHeaderBlockFragment = maxHeaderBlockFragment;
|
||||
}
|
||||
|
||||
public FlowControlStrategy.Factory getFlowControlStrategyFactory()
|
||||
{
|
||||
return flowControlStrategyFactory;
|
||||
}
|
||||
|
||||
public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowControlStrategyFactory)
|
||||
{
|
||||
this.flowControlStrategyFactory = flowControlStrategyFactory;
|
||||
}
|
||||
|
||||
public HttpConfiguration getHttpConfiguration()
|
||||
{
|
||||
return httpConfiguration;
|
||||
|
@ -114,6 +125,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
|
||||
Generator generator = new Generator(connector.getByteBufferPool(), getMaxDynamicTableSize(), getMaxHeaderBlockFragment());
|
||||
FlowControlStrategy flowControl = newFlowControlStrategy();
|
||||
if (flowControl == null)
|
||||
flowControl = getFlowControlStrategyFactory().newFlowControlStrategy();
|
||||
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener, flowControl);
|
||||
session.setMaxLocalStreams(getMaxConcurrentStreams());
|
||||
session.setMaxRemoteStreams(getMaxConcurrentStreams());
|
||||
|
@ -130,9 +143,13 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
|
|||
return configure(connection, connector, endPoint);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #setFlowControlStrategyFactory(FlowControlStrategy.Factory)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
return new BufferingFlowControlStrategy(getInitialStreamSendWindow(), 0.5F);
|
||||
return null;
|
||||
}
|
||||
|
||||
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);
|
||||
|
|
Loading…
Reference in New Issue