Using EndPoint.upgrade() in client code when connections are upgraded

and removed ClientConnectionFactory.Helper.
This commit is contained in:
Simone Bordet 2015-10-21 14:32:08 +02:00
parent 20072252c0
commit 3fc6320881
7 changed files with 19 additions and 74 deletions

View File

@ -179,10 +179,7 @@ public class HttpProxy extends ProxyConfiguration.Proxy
ClientConnectionFactory sslConnectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
Helper.replaceConnection(oldConnection, newConnection);
// Avoid setting fill interest in the old Connection,
// without closing the underlying EndPoint.
oldConnection.softClose();
endPoint.upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}

View File

@ -198,10 +198,10 @@ public class Socks4Proxy extends ProxyConfiguration.Proxy
ClientConnectionFactory connectionFactory = this.connectionFactory;
if (HttpScheme.HTTPS.is(destination.getScheme()))
connectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
org.eclipse.jetty.io.Connection connection = connectionFactory.newConnection(getEndPoint(), context);
ClientConnectionFactory.Helper.replaceConnection(this, connection);
org.eclipse.jetty.io.Connection newConnection = connectionFactory.newConnection(getEndPoint(), context);
getEndPoint().upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("SOCKS4 tunnel established: {} over {}", this, connection);
LOG.debug("SOCKS4 tunnel established: {} over {}", this, newConnection);
}
catch (Throwable x)
{

View File

@ -96,11 +96,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
EndPoint endPoint = connection.getEndPoint();
while (true)
{
// Connection may be closed in a parser callback.
if (connection.isClosed())
boolean upgraded = connection != endPoint.getConnection();
// Connection may be closed or upgraded in a parser callback.
if (connection.isClosed() || upgraded)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed", connection);
LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed");
releaseBuffer();
return;
}

View File

@ -163,7 +163,7 @@ public class HttpClientCustomProxyTest
{
super.onOpen();
fillInterested();
getEndPoint().write(new Callback.Adapter(), ByteBuffer.wrap(CAFE_BABE));
getEndPoint().write(Callback.NOOP, ByteBuffer.wrap(CAFE_BABE));
}
@Override
@ -177,7 +177,7 @@ public class HttpClientCustomProxyTest
Assert.assertArrayEquals(CAFE_BABE, buffer.array());
// We are good, upgrade the connection
ClientConnectionFactory.Helper.replaceConnection(this, connectionFactory.newConnection(getEndPoint(), context));
getEndPoint().upgrade(connectionFactory.newConnection(getEndPoint(), context));
}
catch (Throwable x)
{
@ -232,10 +232,10 @@ public class HttpClientCustomProxyTest
int filled = getEndPoint().fill(buffer);
Assert.assertEquals(4, filled);
Assert.assertArrayEquals(CAFE_BABE, buffer.array());
getEndPoint().write(new Callback.Adapter(), buffer);
getEndPoint().write(Callback.NOOP, buffer);
// We are good, upgrade the connection
ClientConnectionFactory.Helper.replaceConnection(this, connectionFactory.newConnection(connector, getEndPoint()));
getEndPoint().upgrade(connectionFactory.newConnection(connector, getEndPoint()));
}
catch (Throwable x)
{

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpFields;
@ -62,7 +61,8 @@ public class HttpReceiverOverHTTPTest
client.start();
destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
endPoint = new ByteArrayEndPoint();
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<>());
endPoint.setConnection(connection);
}
@After
@ -207,7 +207,7 @@ public class HttpReceiverOverHTTPTest
@Test
public void test_FillInterested_RacingWith_BufferRelease() throws Exception
{
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>())
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<>())
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
@ -234,7 +234,8 @@ public class HttpReceiverOverHTTPTest
};
}
};
endPoint.setConnection(connection);
// Partial response to trigger the call to fillInterested().
endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" +

View File

@ -19,12 +19,8 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Factory for client-side {@link Connection} instances.
*/
@ -38,53 +34,4 @@ public interface ClientConnectionFactory
* @throws IOException if the connection cannot be created
*/
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException;
public static class Helper
{
private static Logger LOG = Log.getLogger(Helper.class);
private Helper()
{
}
/**
* Replaces the given {@code oldConnection} with the given {@code newConnection} on the
* {@link EndPoint} associated with {@code oldConnection}, performing connection lifecycle management.
* <p>
* The {@code oldConnection} will be closed by invoking {@link org.eclipse.jetty.io.Connection#onClose()}
* and the {@code newConnection} will be opened by invoking {@link org.eclipse.jetty.io.Connection#onOpen()}.
* @param oldConnection the old connection to replace
* @param newConnection the new connection replacement
*/
public static void replaceConnection(Connection oldConnection, Connection newConnection)
{
close(oldConnection);
oldConnection.getEndPoint().setConnection(newConnection);
open(newConnection);
}
private static void open(Connection connection)
{
try
{
connection.onOpen();
}
catch (Throwable x)
{
LOG.debug(x);
}
}
private static void close(Connection connection)
{
try
{
connection.onClose();
}
catch (Throwable x)
{
LOG.debug(x);
}
}
}
}

View File

@ -108,9 +108,7 @@ public abstract class NegotiatingClientConnection extends AbstractConnection
EndPoint endPoint = getEndPoint();
try
{
Connection oldConnection = endPoint.getConnection();
Connection newConnection = connectionFactory.newConnection(endPoint, context);
ClientConnectionFactory.Helper.replaceConnection(oldConnection, newConnection);
endPoint.upgrade(connectionFactory.newConnection(endPoint, context));
}
catch (Throwable x)
{