Jetty9 - Dispatching the onConnect(Session) callback in a different thread.

This is needed for two reasons: the first is that onOpen() is called from the selector thread
and onConnect() may perform I/O (such as sending a SETTINGS frame), and second
because onOpen() may be called when NPN upgrades the connection.
In the latter case, SslConnection is busy with the SSL handshake (for example is filling)
and calling onConnect() triggers a write, that ends up in the SslConnection that sees that
it is still in the SSL handshake and needs to fill, resulting in a reentrant fill, which is not
supported correctly by SslConnection.
Dispatching to a new thread makes the call safe, since SslConnection is properly synchronized.
This commit is contained in:
Simone Bordet 2012-08-28 15:20:14 +02:00
parent 2339da7cc0
commit 7a2b53d987
4 changed files with 38 additions and 30 deletions

View File

@ -206,9 +206,10 @@ public class SslConnection extends AbstractConnection
@Override @Override
public String toString() public String toString()
{ {
return String.format("SslConnection@%x{%s}", return String.format("SslConnection@%x{%s} -> %s",
hashCode(), hashCode(),
_sslEngine.getHandshakeStatus()); _sslEngine.getHandshakeStatus(),
_decryptedEndPoint.getConnection());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -285,7 +286,7 @@ public class SslConnection extends AbstractConnection
{ {
return getEndPoint(); return getEndPoint();
} }
@Override @Override
protected FillInterest getFillInterest() protected FillInterest getFillInterest()
{ {
@ -460,7 +461,7 @@ public class SslConnection extends AbstractConnection
case NEED_WRAP: case NEED_WRAP:
// we need to send some handshake data (probably to send a close handshake). // we need to send some handshake data (probably to send a close handshake).
// If we were called from flush, // If we were called from flush,
if (buffer==__FLUSH_CALLED_FILL) if (buffer==__FLUSH_CALLED_FILL)
return -1; // it can deal with the close handshake return -1; // it can deal with the close handshake
@ -523,11 +524,11 @@ public class SslConnection extends AbstractConnection
case NEED_WRAP: case NEED_WRAP:
// we need to send some handshake data // we need to send some handshake data
// if we are called from flush // if we are called from flush
if (buffer==__FLUSH_CALLED_FILL) if (buffer==__FLUSH_CALLED_FILL)
return 0; // let it do the wrapping return 0; // let it do the wrapping
_fillRequiresFlushToProgress = true; _fillRequiresFlushToProgress = true;
flush(__FILL_CALLED_FLUSH); flush(__FILL_CALLED_FLUSH);
if (BufferUtil.isEmpty(_encryptedOutput)) if (BufferUtil.isEmpty(_encryptedOutput))
@ -642,7 +643,7 @@ public class SslConnection extends AbstractConnection
if (BufferUtil.isEmpty(b)) if (BufferUtil.isEmpty(b))
BufferUtil.clear(b); BufferUtil.clear(b);
} }
// and deal with the results returned from the sslEngineWrap // and deal with the results returned from the sslEngineWrap
switch (wrapResult.getStatus()) switch (wrapResult.getStatus())
{ {
@ -778,7 +779,7 @@ public class SslConnection extends AbstractConnection
{ {
return _sslEngine.isInboundDone(); return _sslEngine.isInboundDone();
} }
@Override @Override
public String toString() public String toString()
{ {

View File

@ -60,7 +60,7 @@ public class NextProtoNegoServerConnection extends AbstractConnection implements
int filled = fill(); int filled = fill();
if (filled == 0 && !completed) if (filled == 0 && !completed)
fillInterested(); fillInterested();
if (filled <= 0) if (filled <= 0 || completed)
break; break;
} }
} }

View File

@ -350,9 +350,7 @@ public class SPDYClient
else else
{ {
ConnectionFactory connectionFactory = new ClientSPDYConnectionFactory(); ConnectionFactory connectionFactory = new ClientSPDYConnectionFactory();
Connection connection = connectionFactory.newConnection(channel, endPoint, attachment); return connectionFactory.newConnection(channel, endPoint, attachment);
endPoint.setConnection(connection);
return connection;
} }
} }
catch (RuntimeException x) catch (RuntimeException x)

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.spdy;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
@ -33,21 +34,21 @@ import org.eclipse.jetty.spdy.parser.Parser;
public class ServerSPDYConnectionFactory implements ConnectionFactory public class ServerSPDYConnectionFactory implements ConnectionFactory
{ {
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final Executor threadPool; private final Executor executor;
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;
private final short version; private final short version;
private final ServerSessionFrameListener listener; private final ServerSessionFrameListener listener;
public ServerSPDYConnectionFactory(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler) public ServerSPDYConnectionFactory(short version, ByteBufferPool bufferPool, Executor executor, ScheduledExecutorService scheduler)
{ {
this(version, bufferPool, threadPool, scheduler, null); this(version, bufferPool, executor, scheduler, null);
} }
public ServerSPDYConnectionFactory(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, ServerSessionFrameListener listener) public ServerSPDYConnectionFactory(short version, ByteBufferPool bufferPool, Executor executor, ScheduledExecutorService scheduler, ServerSessionFrameListener listener)
{ {
this.version = version; this.version = version;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.threadPool = threadPool; this.executor = executor;
this.scheduler = scheduler; this.scheduler = scheduler;
this.listener = listener; this.listener = listener;
} }
@ -71,8 +72,8 @@ public class ServerSPDYConnectionFactory implements ConnectionFactory
FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version); FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version);
StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator, flowControlStrategy); StandardSession session = new StandardSession(getVersion(), getBufferPool(), getExecutor(), getScheduler(), connection, connection, 2, listener, generator, flowControlStrategy);
session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddress()); session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddress()); // TODO: make this available through API
session.setWindowSize(connector.getInitialWindowSize()); session.setWindowSize(connector.getInitialWindowSize());
parser.addListener(session); parser.addListener(session);
connection.setSession(session); connection.setSession(session);
@ -92,16 +93,21 @@ public class ServerSPDYConnectionFactory implements ConnectionFactory
return bufferPool; return bufferPool;
} }
protected Executor getThreadPool() protected Executor getExecutor()
{ {
return threadPool; return executor;
} }
private static class ServerSPDYConnection extends SPDYConnection public ScheduledExecutorService getScheduler()
{
return scheduler;
}
private static class ServerSPDYConnection extends SPDYConnection implements Runnable
{ {
private final ServerSessionFrameListener listener; private final ServerSessionFrameListener listener;
private final SPDYServerConnector connector; private final SPDYServerConnector connector;
private volatile boolean connected; private final AtomicBoolean connected = new AtomicBoolean();
private ServerSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector) private ServerSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
{ {
@ -113,14 +119,9 @@ public class ServerSPDYConnectionFactory implements ConnectionFactory
@Override @Override
public void onOpen() public void onOpen()
{ {
if (!connected)
{
// NPE guard to support tests
if (listener != null)
listener.onConnect(getSession());
connected = true;
}
super.onOpen(); super.onOpen();
if (connected.compareAndSet(false, true))
getExecutor().execute(this);
} }
@Override @Override
@ -129,5 +130,13 @@ public class ServerSPDYConnectionFactory implements ConnectionFactory
super.onClose(); super.onClose();
connector.sessionClosed(getSession()); connector.sessionClosed(getSession());
} }
@Override
public void run()
{
// NPE guard to support tests
if (listener != null)
listener.onConnect(getSession());
}
} }
} }