Notifying client connection's promise from onOpen().

The client connection's promise was succeeded in the context of a
call to SelectorManager.newConnection().
This was wrong, since succeeding the promise may trigger the send
of a request *before* the connection is actually linked to the
endPoint and opened.

This change moves the succeeding of the client connection's promise
to the connection onOpen() method.
This commit is contained in:
Simone Bordet 2015-02-20 18:23:00 +01:00
parent 08b4bd439e
commit 58ea526b56
10 changed files with 65 additions and 29 deletions

View File

@ -49,6 +49,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
@Override
public void succeeded()
{
send();
}
@Override

View File

@ -50,15 +50,16 @@ public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.succeeded(connection);
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return connection;
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination)
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination);
return new HttpConnectionOverHTTP(endPoint, destination, promise);
}
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -38,13 +39,15 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
private final AtomicBoolean closed = new AtomicBoolean();
private final Promise<Connection> promise;
private final Delegate delegate;
private final HttpChannelOverHTTP channel;
private long idleTimeout;
public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination)
public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
super(endPoint, destination.getHttpClient().getExecutor());
this.promise = promise;
this.delegate = new Delegate(destination);
this.channel = newHttpChannel();
}
@ -80,7 +83,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
{
super.onOpen();
fillInterested();
getHttpDestination().send();
promise.succeeded(this);
}
public boolean isClosed()

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@ -35,6 +36,7 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
@ -75,9 +77,9 @@ public class HttpClientFailureTest
client = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination)
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination);
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination, promise);
connectionRef.set(connection);
return connection;
}
@ -119,9 +121,9 @@ public class HttpClientFailureTest
client = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination)
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination);
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination, promise);
connectionRef.set(connection);
return connection;
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpFields;
@ -37,6 +38,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.Promise;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -60,7 +62,7 @@ public class HttpReceiverOverHTTPTest
client.start();
destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
endPoint = new ByteArrayEndPoint();
connection = new HttpConnectionOverHTTP(endPoint, destination);
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
}
@After
@ -204,7 +206,7 @@ public class HttpReceiverOverHTTPTest
@Test
public void test_FillInterested_RacingWith_BufferRelease() throws Exception
{
connection = new HttpConnectionOverHTTP(endPoint, destination)
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>())
{
@Override
protected HttpChannelOverHTTP newHttpChannel()

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@ -34,6 +35,7 @@ import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Promise;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -65,7 +67,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
@ -98,7 +100,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, null);
@ -127,7 +129,7 @@ public class HttpSenderOverHTTPTest
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Adapter()
@ -156,7 +158,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Adapter()
@ -191,7 +193,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
String content = "abcdef";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))));
@ -225,7 +227,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "abcdef";
@ -260,7 +262,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "ABCDEF";

View File

@ -69,15 +69,19 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination, isMultiplexed());
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.succeeded(connection);
HttpConnectionOverFCGI connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return connection;
}
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverFCGI(endPoint, destination, promise, isMultiplexed());
}
protected void customize(Request request, HttpFields fastCGIHeaders)
{
fastCGIHeaders.put(FCGI.Headers.DOCUMENT_ROOT, getScriptRoot());

View File

@ -46,6 +46,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.CompletableCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -56,17 +57,19 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private final LinkedList<Integer> requests = new LinkedList<>();
private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final Flusher flusher;
private final HttpDestination destination;
private final Promise<Connection> promise;
private final boolean multiplexed;
private final Flusher flusher;
private final Delegate delegate;
private final ClientParser parser;
private ByteBuffer buffer;
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed)
{
super(endPoint, destination.getHttpClient().getExecutor());
this.destination = destination;
this.promise = promise;
this.multiplexed = multiplexed;
this.flusher = new Flusher(endPoint);
this.delegate = new Delegate(destination);
@ -95,6 +98,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
{
super.onOpen();
fillInterested();
promise.succeeded(this);
}
@Override

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -37,6 +38,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements HttpClientTransport
{
private final HTTP2Client client;
private ClientConnectionFactory connectionFactory;
private HttpClient httpClient;
public HttpClientTransportOverHTTP2(HTTP2Client client)
@ -49,6 +51,16 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
{
addBean(client);
super.doStart();
this.connectionFactory = client.getClientConnectionFactory();
client.setClientConnectionFactory(new ClientConnectionFactory()
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
});
}
@Override
@ -91,9 +103,9 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
final Promise<Session> promise = new Promise<Session>()
{
@Override
public void succeeded(Session result)
public void succeeded(Session session)
{
connection.succeeded(new HttpConnectionOverHTTP2(destination, result));
connection.succeeded(newHttpConnection(destination, session));
}
@Override
@ -109,7 +121,11 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
final HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
return connectionFactory.newConnection(endPoint, context);
}
protected HttpConnectionOverHTTP2 newHttpConnection(HttpDestination destination, Session session)
{
return new HttpConnectionOverHTTP2(destination, session);
}
}

View File

@ -105,6 +105,7 @@ public abstract class AbstractTest
{
HTTP2Client http2Client = new HTTP2Client();
http2Client.setExecutor(clientThreads);
http2Client.setSelectors(1);
return new HttpClientTransportOverHTTP2(http2Client);
}
default: