Code cleanups.

Added TODOs to remember the link between the HTTP2Connection
fill callback and HTTP2ConnectionFactory.reservedThreads.
This commit is contained in:
Simone Bordet 2017-09-07 15:43:07 +02:00
parent ec9a4ecdc1
commit 006dee439f
3 changed files with 36 additions and 51 deletions

View File

@ -47,7 +47,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public static final String CLIENT_CONTEXT_KEY = "http2.client"; public static final String CLIENT_CONTEXT_KEY = "http2.client";
public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool"; public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool";
public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor"; public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor";
public static final String PREALLOCATED_EXECUTOR_CONTEXT_KEY = "http2.client.preallocatedExecutor";
public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler"; public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler";
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener"; public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise"; public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
@ -60,7 +59,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY); HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY); ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY); Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY);
ReservedThreadExecutor preallocatedExecutor = (ReservedThreadExecutor)context.get(PREALLOCATED_EXECUTOR_CONTEXT_KEY);
Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY); Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY);
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY); Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -71,37 +69,29 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192); Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
if (preallocatedExecutor==null) ReservedThreadExecutor reservedExecutor = provideReservedThreadExecutor(client, executor);
{
// TODO move this to non lazy construction
preallocatedExecutor=client.getBean(ReservedThreadExecutor.class);
if (preallocatedExecutor==null)
{
synchronized (this)
{
if (preallocatedExecutor==null)
{
try
{
preallocatedExecutor = new ReservedThreadExecutor(executor,1); // TODO configure size
preallocatedExecutor.start();
client.addBean(preallocatedExecutor,true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}
}
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, preallocatedExecutor, endPoint, HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, reservedExecutor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener); parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener); connection.addListener(connectionListener);
return customize(connection, context); return customize(connection, context);
} }
protected ReservedThreadExecutor provideReservedThreadExecutor(HTTP2Client client, Executor executor)
{
synchronized (this)
{
ReservedThreadExecutor reservedExecutor = client.getBean(ReservedThreadExecutor.class);
if (reservedExecutor == null)
{
// TODO: see HTTP2Connection.FillableCallback
reservedExecutor = new ReservedThreadExecutor(executor, 0);
client.addManaged(reservedExecutor);
}
return reservedExecutor;
}
}
private class HTTP2ClientConnection extends HTTP2Connection implements Callback private class HTTP2ClientConnection extends HTTP2Connection implements Callback
{ {
private final HTTP2Client client; private final HTTP2Client client;

View File

@ -58,7 +58,6 @@ public class HTTP2Connection extends AbstractConnection
this.session = session; this.session = session;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor); this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor);
LifeCycle.start(strategy); LifeCycle.start(strategy);
} }
@ -274,6 +273,8 @@ public class HTTP2Connection extends AbstractConnection
@Override @Override
public InvocationType getInvocationType() public InvocationType getInvocationType()
{ {
// TODO: see also AbstractHTTP2ServerConnectionFactory.reservedThreads.
// TODO: it's non blocking here because reservedThreads=0.
return InvocationType.NON_BLOCKING; return InvocationType.NON_BLOCKING;
} }
} }

View File

@ -154,6 +154,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
public void setReservedThreads(int threads) public void setReservedThreads(int threads)
{ {
// TODO: see also HTTP2Connection.FillableCallback.
// TODO: currently disabled since the only value that works is 0. // TODO: currently disabled since the only value that works is 0.
// this.reservedThreads = threads; // this.reservedThreads = threads;
} }
@ -183,28 +184,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
session.setStreamIdleTimeout(streamIdleTimeout); session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow()); session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
ReservedThreadExecutor executor = connector.getBean(ReservedThreadExecutor.class); ReservedThreadExecutor executor = provideReservedThreadExecutor(connector);
if (executor==null)
{
synchronized (this)
{
executor = connector.getBean(ReservedThreadExecutor.class);
if (executor==null)
{
try
{
executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads());
executor.start();
connector.addBean(executor,true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}
ServerParser parser = newServerParser(connector, session); ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor, HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor,
@ -213,6 +193,20 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
return configure(connection, connector, endPoint); return configure(connection, connector, endPoint);
} }
protected ReservedThreadExecutor provideReservedThreadExecutor(Connector connector)
{
synchronized (this)
{
ReservedThreadExecutor executor = getBean(ReservedThreadExecutor.class);
if (executor == null)
{
executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads());
addManaged(executor);
}
return executor;
}
}
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint); protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);
protected ServerParser newServerParser(Connector connector, ServerParser.Listener listener) protected ServerParser newServerParser(Connector connector, ServerParser.Listener listener)