Simplified connection establishment code.

This commit is contained in:
Simone Bordet 2012-11-23 13:16:06 +01:00
parent ee893d8526
commit b7cdb29a14
1 changed files with 33 additions and 34 deletions

View File

@ -159,7 +159,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
public Future<Connection> newConnection() public Future<Connection> newConnection()
{ {
FuturePromise<Connection> result = new FuturePromise<>(); FuturePromise<Connection> result = new FuturePromise<>();
newConnection(new CONNECTPromise(result)); newConnection(new ProxyPromise(result));
return result; return result;
} }
@ -191,7 +191,8 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
{ {
LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this); LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
CONNECTPromise connectPromise = new CONNECTPromise(new Promise<Connection>() // This is the promise that is being called when a connection (eventually proxied) succeeds or fails.
Promise<Connection> promise = new Promise<Connection>()
{ {
@Override @Override
public void succeeded(Connection connection) public void succeeded(Connection connection)
@ -211,8 +212,29 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
} }
}); });
} }
};
// Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed.
// Differently from the case where the connection is created explicitly by applications, here
// we need to do a bit more logging and keep track of the connection count in case of failures.
newConnection(new ProxyPromise<Connection>(promise)
{
@Override
public void succeeded(Connection connection)
{
LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
super.succeeded(connection);
}
@Override
public void failed(Throwable x)
{
LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
connectionCount.decrementAndGet();
super.failed(x);
}
}); });
newConnection(new TCPPromise(next, maxConnections, connectPromise));
// Try again the idle connections // Try again the idle connections
return idleConnections.poll(); return idleConnections.poll();
} }
@ -423,40 +445,17 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
} }
} }
private class TCPPromise implements Promise<Connection> /**
{ * Decides whether to establish a proxy tunnel using HTTP CONNECT.
private final int current; * It is implemented as a promise because it needs to establish the tunnel
private final int max; * when the TCP connection is succeeded, and needs to notify another
private final Promise<Connection> delegate; * promise when the tunnel is established (or failed).
*/
private TCPPromise(int current, int max, Promise<Connection> delegate) private class ProxyPromise implements Promise<Connection>
{
this.current = current;
this.max = max;
this.delegate = delegate;
}
@Override
public void succeeded(Connection connection)
{
LOG.debug("Created connection {}/{} {} for {}", current, max, connection, HttpDestination.this);
delegate.succeeded(connection);
}
@Override
public void failed(Throwable x)
{
LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
connectionCount.decrementAndGet();
delegate.failed(x);
}
}
private class CONNECTPromise implements Promise<Connection>
{ {
private final Promise<Connection> delegate; private final Promise<Connection> delegate;
private CONNECTPromise(Promise<Connection> delegate) private ProxyPromise(Promise<Connection> delegate)
{ {
this.delegate = delegate; this.delegate = delegate;
} }