jetty-9 refactored onOpen and onClose

This commit is contained in:
Greg Wilkins 2012-09-14 17:38:47 +10:00
parent fb2ca29364
commit 22ba57b8d2
23 changed files with 202 additions and 144 deletions

View File

@ -518,7 +518,6 @@ public class HttpClient extends AggregateLifeCycle
HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint, destination);
appEndPoint.setConnection(connection);
callback.callback.completed(connection);
connection.onOpen();
return sslConnection;
}
@ -531,6 +530,7 @@ public class HttpClient extends AggregateLifeCycle
}
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
@ -543,6 +543,18 @@ public class HttpClient extends AggregateLifeCycle
{
getExecutor().execute(task);
}
@Override
public void connectionOpened(org.eclipse.jetty.io.Connection connection)
{
connection.onOpen();
}
@Override
public void connectionClosed(org.eclipse.jetty.io.Connection connection)
{
connection.onClose();
}
}
private class ConnectionCallback extends FutureCallback<Connection>

View File

@ -38,6 +38,7 @@ public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final long _created=System.currentTimeMillis();
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback<Void> _readCallback;
@ -206,10 +207,17 @@ public abstract class AbstractConnection implements Connection
return true;
}
// TODO remove this when open/close refactored
final AtomicReference<Throwable> _opened = new AtomicReference<>(null);
@Override
public void onOpen()
{
LOG.debug("{} opened",this);
if (!_opened.compareAndSet(null,new Throwable()))
{
LOG.warn("ALREADY OPENED ",_opened.get());
LOG.warn("EXTRA OPEN AT ",new Throwable());
}
}
@Override
@ -230,6 +238,24 @@ public abstract class AbstractConnection implements Connection
getEndPoint().close();
}
@Override
public int getMessagesIn()
{
return 0;
}
@Override
public int getMessagesOut()
{
return 0;
}
@Override
public long getCreatedTimeStamp()
{
return _created;
}
@Override
public String toString()
{

View File

@ -53,5 +53,12 @@ public interface Connection extends AutoCloseable
* {@link EndPoint} but, for example, SSL connections should write the SSL close message
* before closing the associated {@link EndPoint}.</p>
*/
@Override
public void close();
public int getMessagesIn();
public int getMessagesOut();
public long getCreatedTimeStamp();
}

View File

@ -102,19 +102,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
updateLocalInterests(SelectionKey.OP_WRITE, true);
}
@Override
public void setConnection(Connection connection)
{
// TODO should this be on AbstractEndPoint?
Connection old = getConnection();
super.setConnection(connection);
if (old != null && old != connection)
{
LOG.debug("Upgrading connection {} -> {} on endPoint {}", old, connection, this);
_selector.getSelectorManager().connectionUpgraded(this, old);
}
}
@Override
public void onSelected()
{

View File

@ -177,32 +177,14 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just opened
*/
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
abstract protected void connectionOpened(Connection connection);
/**
* <p>Callback method invoked when a connection is closed.</p>
*
* @param connection the connection just closed
*/
public void connectionClosed(Connection connection)
{
connection.onClose();
}
/**
* <p>Callback method invoked when a connection is upgraded.</p>
*
* @param endpoint the endpoint holding the new connection
* @param oldConnection the previous connection
*/
public void connectionUpgraded(EndPoint endpoint, Connection oldConnection)
{
connectionClosed(oldConnection);
connectionOpened(endpoint.getConnection());
}
abstract protected void connectionClosed(Connection connection);
/**
* <p>Callback method invoked when a non-blocking connect cannot be completed.</p>

View File

@ -132,7 +132,7 @@ public class SslConnection extends AbstractConnection
return _sslEngine;
}
public EndPoint getDecryptedEndPoint()
public DecryptedEndPoint getDecryptedEndPoint()
{
return _decryptedEndPoint;
}
@ -142,20 +142,35 @@ public class SslConnection extends AbstractConnection
{
try
{
super.onOpen();
// Begin the handshake
_sslEngine.beginHandshake();
// TODO: check that it is ok to remove this code
// if (_sslEngine.getUseClientMode())
// getExecutor().execute(_runWriteEmpty);
}
catch (SSLException x)
{
getEndPoint().close();
throw new RuntimeIOException(x);
}
super.onOpen();
}
@Override
public void onClose()
{
_decryptedEndPoint.getConnection().onClose();
super.onClose();
}
@Override
public int getMessagesIn()
{
return _decryptedEndPoint.getConnection().getMessagesIn();
}
@Override
public int getMessagesOut()
{
return _decryptedEndPoint.getConnection().getMessagesOut();
}
@Override
@ -421,8 +436,6 @@ public class SslConnection extends AbstractConnection
a.setInputBufferSize(_sslEngine.getSession().getApplicationBufferSize());
}
connection.onOpen();
super.setConnection(connection);
}
@ -434,10 +447,6 @@ public class SslConnection extends AbstractConnection
@Override
public synchronized int fill(ByteBuffer buffer) throws IOException
{
// TODO remove this when we are certain it is OK
if (Thread.currentThread().getName().contains("selector"))
new Throwable().printStackTrace();
LOG.debug("{} fill enter", SslConnection.this);
try
{
@ -639,10 +648,6 @@ public class SslConnection extends AbstractConnection
@Override
public synchronized boolean flush(ByteBuffer... appOuts) throws IOException
{
// TODO remove this when we are certain it is OK
if (Thread.currentThread().getName().contains("selector"))
new Throwable().printStackTrace();
// The contract for flush does not require that all appOuts bytes are written
// or even that any appOut bytes are written! If the connection is write block
// or busy handshaking, then zero bytes may be taken from appOuts and this method

View File

@ -101,6 +101,19 @@ public class SelectChannelEndPointInterestsTest
}
};
}
@Override
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@Override
public void connectionClosed(Connection connection)
{
connection.onClose();
}
};
selectorManager.start();
}

View File

@ -79,6 +79,18 @@ public class SelectChannelEndPointTest
_lastEndPointLatch.countDown();
return endp;
}
@Override
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@Override
public void connectionClosed(Connection connection)
{
connection.onClose();
}
};
// Must be volatile or the test may fail spuriously

View File

@ -88,6 +88,18 @@ public class SslConnectionTest
_lastEndp=endp;
return endp;
}
@Override
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@Override
public void connectionClosed(Connection connection)
{
connection.onClose();
}
};
// Must be volatile or the test may fail spuriously
@ -244,29 +256,26 @@ public class SslConnectionTest
{
_testFill=false;
for (int i=0;i<1;i++)
{
_writeCallback = new FutureCallback<>();
Socket client = newClient();
client.setSoTimeout(600000); // TODO reduce after debugging
_writeCallback = new FutureCallback<>();
Socket client = newClient();
client.setSoTimeout(10000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals("Hello Client",new String(buffer,0,len,StringUtil.__UTF8_CHARSET));
Assert.assertEquals(null,_writeCallback.get(100,TimeUnit.MILLISECONDS));
client.close();
}
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals("Hello Client",new String(buffer,0,len,StringUtil.__UTF8_CHARSET));
Assert.assertEquals(null,_writeCallback.get(100,TimeUnit.MILLISECONDS));
client.close();
}
@Test
public void testManyLines() throws Exception
{
final Socket client = newClient();
client.setSoTimeout(60000);
client.setSoTimeout(10000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);

View File

@ -391,21 +391,21 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
protected void connectionOpened(Connection connection)
{
_stats.connectionOpened();
}
protected void connectionUpgraded(Connection oldConnection, Connection newConnection)
{
long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp();
int requests = (oldConnection instanceof HttpConnection) ? ((HttpConnection)oldConnection).getHttpChannel().getRequests() : 0;
_stats.connectionUpgraded(duration, requests, requests);
connection.onOpen();
}
protected void connectionClosed(Connection connection)
{
connection.onClose();
long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
// TODO: remove casts to HttpConnection
int requests = (connection instanceof HttpConnection) ? ((HttpConnection)connection).getHttpChannel().getRequests() : 0;
_stats.connectionClosed(duration, requests, requests);
_stats.connectionClosed(duration, connection.getMessagesIn(), connection.getMessagesOut());
}
public void connectionUpgraded(Connection oldConnection, Connection newConnection)
{
oldConnection.onClose();
_stats.connectionUpgraded(oldConnection.getMessagesIn(), oldConnection.getMessagesOut());
newConnection.onOpen();
}
@Override

View File

@ -197,11 +197,10 @@ public interface Connector extends LifeCycle, Graceful
/**
* <p>Callback method invoked when a connection is upgraded.</p>
*
* @param duration the time the previous connection was opened
* @param messagesIn the number of messages received by the previous connection
* @param messagesOut the number of messages send by the previous connection
*/
public void connectionUpgraded(long duration, int messagesIn, int messagesOut);
public void connectionUpgraded(int messagesIn, int messagesOut);
/**
* <p>Callback method invoked when a connection is closed.</p>

View File

@ -162,7 +162,7 @@ class ConnectorStatistics extends AbstractLifeCycle implements Statistics, Dumpa
}
@Override
public void connectionUpgraded(long duration, int messagesIn, int messagesOut)
public void connectionUpgraded(int messagesIn, int messagesOut)
{
if (isStarted())
{

View File

@ -136,6 +136,19 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_chunk=null;
}
}
@Override
public int getMessagesIn()
{
return getHttpChannel().getRequests();
}
@Override
public int getMessagesOut()
{
return getHttpChannel().getRequests();
}
@Override
public String toString()
@ -433,6 +446,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().setConnection(connection);
((AbstractConnector)getConnector()).connectionUpgraded(this,connection);
}
}

View File

@ -168,7 +168,6 @@ public class LocalConnector extends AbstractConnector
Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
endPoint.setConnection(connection);
connectionOpened(connection);
connection.onOpen();
}
public class LocalEndPoint extends ByteArrayEndPoint
@ -197,7 +196,6 @@ public class LocalConnector extends AbstractConnector
if (was_open)
{
connectionClosed(getConnection());
getConnection().onClose();
onClose();
}
}
@ -261,6 +259,5 @@ public class LocalConnector extends AbstractConnector
}
}
}
}
}

View File

@ -261,16 +261,6 @@ public class SelectChannelConnector extends AbstractNetworkConnector
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout());
}
protected void endPointClosed(EndPoint endpoint)
{
connectionClosed(endpoint.getConnection());
}
protected Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
{
return getDefaultConnectionFactory().newConnection(this, endPoint);
}
/**
* @return the linger time
* @see Socket#getSoLinger()
@ -340,23 +330,14 @@ public class SelectChannelConnector extends AbstractNetworkConnector
public void connectionOpened(Connection connection)
{
SelectChannelConnector.this.connectionOpened(connection);
super.connectionOpened(connection);
}
@Override
public void connectionClosed(Connection connection)
{
super.connectionClosed(connection);
SelectChannelConnector.this.connectionClosed(connection);
}
@Override
public void connectionUpgraded(EndPoint endpoint, Connection oldConnection)
{
SelectChannelConnector.this.connectionUpgraded(oldConnection, endpoint.getConnection());
super.connectionUpgraded(endpoint, oldConnection);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
@ -366,7 +347,7 @@ public class SelectChannelConnector extends AbstractNetworkConnector
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
return SelectChannelConnector.this.newConnection(channel, endpoint, attachment);
return getDefaultConnectionFactory().newConnection(SelectChannelConnector.this, endpoint);
}
}

View File

@ -85,8 +85,8 @@ public class SslConnectionFactory extends AbstractConnectionFactory
ConnectionFactory next = connector.getConnectionFactory(_nextProtocol);
Connection connection = next.newConnection(connector, decrypted_endp);
decrypted_endp.setConnection(connection);
((AbstractConnector)connector).connectionOpened(connection);
return sslConnection;
}

View File

@ -98,10 +98,4 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
return endPoint;
}
@Override
protected void endPointClosed(EndPoint endpoint)
{
super.endPointClosed(endpoint);
((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed();
}
}

View File

@ -66,17 +66,9 @@ public class SelectChannelStatisticsTest
_server = new Server();
_connector = new SelectChannelConnector(_server)
{
@Override
protected void endPointClosed(EndPoint endpoint)
{
//System.err.println("Endpoint closed "+endpoint);
super.endPointClosed(endpoint);
}
@Override
public void connectionClosed(Connection connection)
{
//System.err.println("Connection closed "+connection);
super.connectionClosed(connection);
_closed.countDown();
}

View File

@ -23,9 +23,13 @@ import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -37,20 +41,31 @@ public class NextProtoNegoClientConnection extends AbstractConnection implements
private final SocketChannel channel;
private final Object attachment;
private final SPDYClient client;
private final SSLEngine engine;
private volatile boolean completed;
public NextProtoNegoClientConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client)
public NextProtoNegoClientConnection(SocketChannel channel, DecryptedEndPoint endPoint, Object attachment, Executor executor, SPDYClient client)
{
super(endPoint, executor);
this.channel = channel;
this.attachment = attachment;
this.client = client;
this.engine=endPoint.getSslConnection().getSSLEngine();
NextProtoNego.put(engine, this);
}
@Override
public void onOpen()
{
super.onOpen();
try
{
getEndPoint().flush(BufferUtil.EMPTY_BUFFER);
}
catch(IOException e)
{
throw new RuntimeIOException(e);
}
fillInterested();
}
@ -90,6 +105,7 @@ public class NextProtoNegoClientConnection extends AbstractConnection implements
@Override
public void unsupported()
{
NextProtoNego.remove(engine);
// Server does not support NPN, but this is a SPDY client, so hardcode SPDY
EndPoint endPoint = getEndPoint();
Connection connection = client.getConnectionFactory().newConnection(channel, endPoint, attachment);
@ -100,6 +116,7 @@ public class NextProtoNegoClientConnection extends AbstractConnection implements
@Override
public String selectProtocol(List<String> protocols)
{
NextProtoNego.remove(engine);
String protocol = client.selectProtocol(protocols);
if (protocol == null)
return null;

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil;
@ -65,7 +66,6 @@ public class NextProtoNegoServerConnection extends AbstractConnection implements
@Override
public void onClose()
{
NextProtoNego.remove(engine);
super.onClose();
};
@ -111,11 +111,12 @@ public class NextProtoNegoServerConnection extends AbstractConnection implements
@Override
public void protocolSelected(String protocol)
{
NextProtoNego.remove(engine);
ConnectionFactory connectionFactory = connector.getConnectionFactory(protocol);
EndPoint endPoint = getEndPoint();
Connection connection = connectionFactory.newConnection(connector, endPoint);
endPoint.setConnection(connection);
NextProtoNego.remove(engine);
((AbstractConnector)connector).connectionUpgraded(this,connection);
completed = true;
}
}

View File

@ -40,7 +40,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
@ -154,9 +154,9 @@ public class SPDYClient
public void replaceConnection(EndPoint endPoint, Connection connection)
{
Connection oldConnection = endPoint.getConnection();
endPoint.getConnection().onClose();
endPoint.setConnection(connection);
factory.selector.connectionUpgraded(endPoint, oldConnection);
connection.onOpen();
}
public static class Factory extends AggregateLifeCycle
@ -277,30 +277,15 @@ public class SPDYClient
if (sslContextFactory != null)
{
final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
SslConnection sslConnection = new SslConnection(bufferPool, executor, endPoint, engine)
{
@Override
public void onClose()
{
NextProtoNego.remove(engine);
super.onClose();
}
};
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
SslConnection sslConnection = new SslConnection(bufferPool, executor, endPoint, engine);
DecryptedEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
NextProtoNegoClientConnection connection = new NextProtoNegoClientConnection(channel, sslEndPoint, attachment, client.factory.executor, client);
sslEndPoint.setConnection(connection);
connectionOpened(connection);
NextProtoNego.put(engine, connection);
return sslConnection;
}
else
{
SPDYClientConnectionFactory connectionFactory = new SPDYClientConnectionFactory();
return connectionFactory.newConnection(channel, endPoint, attachment);
}
SPDYClientConnectionFactory connectionFactory = new SPDYClientConnectionFactory();
return connectionFactory.newConnection(channel, endPoint, attachment);
}
catch (RuntimeException x)
{
@ -308,6 +293,19 @@ public class SPDYClient
throw x;
}
}
@Override
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@Override
public void connectionClosed(Connection connection)
{
connection.onClose();
}
}
}

View File

@ -109,7 +109,7 @@ public abstract class AbstractTest
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
}
return clientFactory.newSPDYClient(version).connect(socketAddress, listener).get(5, TimeUnit.SECONDS);
}

View File

@ -143,4 +143,16 @@ public class WebSocketClientSelectorManager extends SelectorManager
{
this.sslContextFactory = sslContextFactory;
}
@Override
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@Override
public void connectionClosed(Connection connection)
{
connection.onClose();
}
}