Improved configurability of HTTP2Client.
This commit is contained in:
parent
d0f0aa7c9f
commit
85edb7e573
|
@ -51,25 +51,50 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
|
||||
public class HTTP2Client extends ContainerLifeCycle
|
||||
{
|
||||
private final Queue<ISession> sessions = new ConcurrentLinkedQueue<>();
|
||||
private final SelectorManager selector;
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
private long idleTimeout;
|
||||
private Executor executor;
|
||||
private Scheduler scheduler;
|
||||
private ByteBufferPool bufferPool;
|
||||
private ClientConnectionFactory connectionFactory;
|
||||
private Queue<ISession> sessions;
|
||||
private SelectorManager selector;
|
||||
public int selectors = 1;
|
||||
public long idleTimeout = 30000;
|
||||
public long connectTimeout = 10000;
|
||||
|
||||
public HTTP2Client()
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
this(new QueuedThreadPool());
|
||||
if (executor == null)
|
||||
setExecutor(new QueuedThreadPool());
|
||||
|
||||
if (scheduler == null)
|
||||
setScheduler(new ScheduledExecutorScheduler());
|
||||
|
||||
if (bufferPool == null)
|
||||
setByteBufferPool(new MappedByteBufferPool());
|
||||
|
||||
if (connectionFactory == null)
|
||||
setClientConnectionFactory(new HTTP2ClientConnectionFactory());
|
||||
|
||||
if (sessions == null)
|
||||
{
|
||||
sessions = new ConcurrentLinkedQueue<>();
|
||||
addBean(sessions);
|
||||
}
|
||||
|
||||
if (selector == null)
|
||||
{
|
||||
selector = newSelectorManager();
|
||||
addBean(selector);
|
||||
}
|
||||
selector.setConnectTimeout(getConnectTimeout());
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
public HTTP2Client(Executor executor)
|
||||
protected SelectorManager newSelectorManager()
|
||||
{
|
||||
addBean(executor);
|
||||
Scheduler scheduler = new ScheduledExecutorScheduler();
|
||||
addBean(scheduler, true);
|
||||
this.selector = new ClientSelectorManager(executor, scheduler);
|
||||
addBean(selector, true);
|
||||
this.byteBufferPool = new MappedByteBufferPool();
|
||||
addBean(byteBufferPool, true);
|
||||
return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,6 +104,80 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
super.doStop();
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
}
|
||||
|
||||
public void setExecutor(Executor executor)
|
||||
{
|
||||
this.updateBean(this.executor, executor);
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
public Scheduler getScheduler()
|
||||
{
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
public void setScheduler(Scheduler scheduler)
|
||||
{
|
||||
this.updateBean(this.scheduler, scheduler);
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
public ByteBufferPool getByteBufferPool()
|
||||
{
|
||||
return bufferPool;
|
||||
}
|
||||
|
||||
public void setByteBufferPool(ByteBufferPool bufferPool)
|
||||
{
|
||||
this.updateBean(this.bufferPool, bufferPool);
|
||||
this.bufferPool = bufferPool;
|
||||
}
|
||||
|
||||
public ClientConnectionFactory getClientConnectionFactory()
|
||||
{
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
|
||||
{
|
||||
this.updateBean(this.connectionFactory, connectionFactory);
|
||||
this.connectionFactory = connectionFactory;
|
||||
}
|
||||
|
||||
public int getSelectors()
|
||||
{
|
||||
return selectors;
|
||||
}
|
||||
|
||||
public void setSelectors(int selectors)
|
||||
{
|
||||
this.selectors = selectors;
|
||||
}
|
||||
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return idleTimeout;
|
||||
}
|
||||
|
||||
public void setIdleTimeout(long idleTimeout)
|
||||
{
|
||||
this.idleTimeout = idleTimeout;
|
||||
}
|
||||
|
||||
public long getConnectTimeout()
|
||||
{
|
||||
return connectTimeout;
|
||||
}
|
||||
|
||||
public void setConnectTimeout(long connectTimeout)
|
||||
{
|
||||
this.connectTimeout = connectTimeout;
|
||||
}
|
||||
|
||||
public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
|
||||
{
|
||||
connect(null, address, listener, promise);
|
||||
|
@ -123,16 +222,6 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
sessions.clear();
|
||||
}
|
||||
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return idleTimeout;
|
||||
}
|
||||
|
||||
public void setIdleTimeout(long idleTimeout)
|
||||
{
|
||||
this.idleTimeout = idleTimeout;
|
||||
}
|
||||
|
||||
public boolean addSession(ISession session)
|
||||
{
|
||||
return sessions.offer(session);
|
||||
|
@ -145,9 +234,9 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
|
||||
private class ClientSelectorManager extends SelectorManager
|
||||
{
|
||||
private ClientSelectorManager(Executor executor, Scheduler scheduler)
|
||||
private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
|
||||
{
|
||||
super(executor, scheduler);
|
||||
super(executor, scheduler, selectors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -161,17 +250,17 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> context = (Map<String, Object>)attachment;
|
||||
context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, byteBufferPool);
|
||||
context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
|
||||
context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
|
||||
context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
|
||||
|
||||
ClientConnectionFactory factory = new HTTP2ClientConnectionFactory();
|
||||
ClientConnectionFactory factory = getClientConnectionFactory();
|
||||
|
||||
SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
|
||||
if (sslContextFactory != null)
|
||||
{
|
||||
ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), factory, "h2-14");
|
||||
factory = new SslClientConnectionFactory(sslContextFactory, byteBufferPool, getExecutor(), alpn);
|
||||
factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
|
||||
}
|
||||
|
||||
return factory.newConnection(endpoint, context);
|
||||
|
|
|
@ -51,14 +51,12 @@ public class AbstractTest
|
|||
protected void start(HttpServlet servlet) throws Exception
|
||||
{
|
||||
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration()));
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
|
||||
context.addServlet(new ServletHolder(servlet), servletPath + "/*");
|
||||
customizeContext(context);
|
||||
server.start();
|
||||
|
||||
prepareClient();
|
||||
|
||||
server.start();
|
||||
client.start();
|
||||
}
|
||||
|
||||
|
@ -69,8 +67,9 @@ public class AbstractTest
|
|||
protected void start(ServerSessionListener listener) throws Exception
|
||||
{
|
||||
prepareServer(new RawHTTP2ServerConnectionFactory(new HttpConfiguration(),listener));
|
||||
prepareClient();
|
||||
server.start();
|
||||
|
||||
prepareClient();
|
||||
client.start();
|
||||
}
|
||||
|
||||
|
@ -85,9 +84,10 @@ public class AbstractTest
|
|||
|
||||
private void prepareClient()
|
||||
{
|
||||
client = new HTTP2Client();
|
||||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||
clientExecutor.setName("client");
|
||||
client = new HTTP2Client(clientExecutor);
|
||||
client.setExecutor(clientExecutor);
|
||||
}
|
||||
|
||||
protected Session newClient(Session.Listener listener) throws Exception
|
||||
|
|
|
@ -89,7 +89,7 @@ public class BufferingFlowControlStrategyTest extends FlowControlStrategyTest
|
|||
});
|
||||
|
||||
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Wait a little more for the window updates to be processed.
|
||||
Thread.sleep(1000);
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.eclipse.jetty.http2.ErrorCode;
|
|||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
|
@ -84,9 +85,19 @@ public abstract class FlowControlStrategyTest
|
|||
server.addConnector(connector);
|
||||
server.start();
|
||||
|
||||
client = new HTTP2Client();
|
||||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||
clientExecutor.setName("client");
|
||||
client = new HTTP2Client(clientExecutor);
|
||||
client.setExecutor(clientExecutor);
|
||||
client.setClientConnectionFactory(new HTTP2ClientConnectionFactory()
|
||||
{
|
||||
@Override
|
||||
protected FlowControlStrategy newFlowControlStrategy()
|
||||
{
|
||||
// return FlowControlStrategyTest.this.newFlowControlStrategy();
|
||||
return new SimpleFlowControlStrategy();
|
||||
}
|
||||
});
|
||||
client.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ public class SimpleFlowControlStrategyTest extends FlowControlStrategyTest
|
|||
});
|
||||
|
||||
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Wait a little more for the window updates to be processed.
|
||||
Thread.sleep(1000);
|
||||
|
|
|
@ -338,6 +338,6 @@ public class StreamResetTest extends AbstractTest
|
|||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.eclipse.jetty.util.Promise;
|
|||
public class HttpClientTransportOverHTTP2 implements HttpClientTransport
|
||||
{
|
||||
private final HTTP2Client client;
|
||||
// private final ClientConnectionFactory connectionFactory;
|
||||
private HttpClient httpClient;
|
||||
|
||||
public HttpClientTransportOverHTTP2(HTTP2Client client)
|
||||
|
|
|
@ -51,7 +51,7 @@ public abstract class AbstractTest
|
|||
{
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
clientThreads.setName("h2-client");
|
||||
http2Client = new HTTP2Client(clientThreads);
|
||||
http2Client = HTTP2Client.builder().executor(clientThreads).build();
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{index}: mod:{0}")
|
||||
|
|
|
@ -217,7 +217,7 @@ public class HttpClientTest extends AbstractTest
|
|||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.method(HttpMethod.POST)
|
||||
.content(new BytesContentProvider(bytes))
|
||||
.timeout(555, TimeUnit.SECONDS)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(500, response.getStatus());
|
||||
|
@ -255,7 +255,7 @@ public class HttpClientTest extends AbstractTest
|
|||
FutureResponseListener listener = new FutureResponseListener(request);
|
||||
request.method(HttpMethod.POST)
|
||||
.content(content)
|
||||
.timeout(555, TimeUnit.SECONDS)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(listener);
|
||||
|
||||
content.offer(ByteBuffer.wrap(chunk1));
|
||||
|
|
Loading…
Reference in New Issue