Refactored HttpDestination to allow customization of the request enqueuing.

Subclasses may override enqueue() and provide a different algorihtm.
This commit is contained in:
Simone Bordet 2015-02-06 15:21:50 +01:00
parent b1953a6859
commit 12eaedd96b
2 changed files with 18 additions and 3 deletions

View File

@ -54,6 +54,11 @@ public class ConnectionPool implements Closeable, Dumpable
this.activeConnections = new BlockingArrayQueue<>(maxConnections); this.activeConnections = new BlockingArrayQueue<>(maxConnections);
} }
public int getConnectionCount()
{
return connectionCount.get();
}
public BlockingQueue<Connection> getIdleConnections() public BlockingQueue<Connection> getIdleConnections()
{ {
return idleConnections; return idleConnections;
@ -76,7 +81,7 @@ public class ConnectionPool implements Closeable, Dumpable
{ {
while (true) while (true)
{ {
int current = connectionCount.get(); int current = getConnectionCount();
final int next = current + 1; final int next = current + 1;
if (next > maxConnections) if (next > maxConnections)

View File

@ -59,7 +59,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
this.client = client; this.client = client;
this.origin = origin; this.origin = origin;
this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination()); this.exchanges = newExchangeQueue(client);
this.requestNotifier = new RequestNotifier(client); this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier(); this.responseNotifier = new ResponseNotifier();
@ -84,6 +84,11 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
hostField = new HttpField(HttpHeader.HOST, host); hostField = new HttpField(HttpHeader.HOST, host);
} }
protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
{
return new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
}
protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory) protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
{ {
return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory); return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
@ -168,7 +173,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
if (client.isRunning()) if (client.isRunning())
{ {
if (exchanges.offer(exchange)) if (enqueue(exchanges, exchange))
{ {
if (!client.isRunning() && exchanges.remove(exchange)) if (!client.isRunning() && exchanges.remove(exchange))
{ {
@ -195,6 +200,11 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
} }
} }
protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
{
return queue.offer(exchange);
}
protected abstract void send(); protected abstract void send();
public void newConnection(Promise<Connection> promise) public void newConnection(Promise<Connection> promise)