Attempting to come to grips with proper close/disconnect/stop/cleanup

This commit is contained in:
Joakim Erdfelt 2012-07-27 13:48:34 -07:00
parent f2c86b2d00
commit c2aae7c517
8 changed files with 145 additions and 64 deletions

View File

@ -59,7 +59,7 @@ public class WebSocketEventDriver implements IncomingFrames
private final WebSocketPolicy policy;
private final EventMethods events;
private final ByteBufferPool bufferPool;
private WebSocketSession connection;
private WebSocketSession session;
private ByteBuffer activeMessage;
private StreamAppender activeStream;
@ -150,7 +150,7 @@ public class WebSocketEventDriver implements IncomingFrames
if (events.onException != null)
{
events.onException.call(websocket,connection,e);
events.onException.call(websocket,session,e);
}
}
@ -171,7 +171,7 @@ public class WebSocketEventDriver implements IncomingFrames
// Generic Read-Only Frame version
if ((frame instanceof Frame) && (events.onFrame != null))
{
events.onFrame.call(websocket,connection,frame);
events.onFrame.call(websocket,session,frame);
// DO NOT return; - as this is just a read-only notification.
}
@ -184,7 +184,7 @@ public class WebSocketEventDriver implements IncomingFrames
CloseInfo close = new CloseInfo(frame);
if (events.onClose != null)
{
events.onClose.call(websocket,connection,close.getStatusCode(),close.getReason());
events.onClose.call(websocket,session,close.getStatusCode(),close.getReason());
}
throw new CloseException(close.getStatusCode(),close.getReason());
}
@ -200,7 +200,7 @@ public class WebSocketEventDriver implements IncomingFrames
BufferUtil.flipToFlush(pongBuf,0);
pong.setPayload(pongBuf);
}
connection.output("pong",new FutureCallback<String>(),pong);
session.output("pong",new FutureCallback<String>(),pong);
break;
}
case BINARY:
@ -230,7 +230,7 @@ public class WebSocketEventDriver implements IncomingFrames
if (needsNotification)
{
events.onBinary.call(websocket,connection,activeStream);
events.onBinary.call(websocket,session,activeStream);
}
if (frame.isFin())
@ -261,7 +261,7 @@ public class WebSocketEventDriver implements IncomingFrames
{
BufferUtil.flipToFlush(activeMessage,0);
byte buf[] = BufferUtil.toArray(activeMessage);
events.onBinary.call(websocket,connection,buf,0,buf.length);
events.onBinary.call(websocket,session,buf,0,buf.length);
}
finally
{
@ -300,7 +300,7 @@ public class WebSocketEventDriver implements IncomingFrames
if (needsNotification)
{
events.onText.call(websocket,connection,activeStream);
events.onText.call(websocket,session,activeStream);
}
if (frame.isFin())
@ -335,7 +335,7 @@ public class WebSocketEventDriver implements IncomingFrames
// TODO: FIX EVIL COPY
utf.append(data,0,data.length);
events.onText.call(websocket,connection,utf.toString());
events.onText.call(websocket,session,utf.toString());
}
finally
{
@ -371,7 +371,7 @@ public class WebSocketEventDriver implements IncomingFrames
{
LOG.debug("{}.onConnect()",websocket.getClass().getSimpleName());
}
events.onConnect.call(websocket,connection);
events.onConnect.call(websocket,session);
}
/**
@ -380,9 +380,9 @@ public class WebSocketEventDriver implements IncomingFrames
* @param conn
* the connection
*/
public void setConnection(WebSocketSession conn)
public void setSession(WebSocketSession conn)
{
this.connection = conn;
this.session = conn;
}
private void terminateConnection(int statusCode, String rawreason)
@ -399,7 +399,7 @@ public class WebSocketEventDriver implements IncomingFrames
}
}
LOG.debug("terminateConnection({},{})",statusCode,rawreason);
connection.close(statusCode,reason);
session.close(statusCode,reason);
}
catch (IOException e)
{
@ -407,6 +407,12 @@ public class WebSocketEventDriver implements IncomingFrames
}
}
@Override
public String toString()
{
return websocket.getClass().getName();
}
private void unhandled(Throwable t)
{
socketLog.warn("Unhandled Error (closing connection)",t);

View File

@ -47,7 +47,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io
*/
public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames
public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames
{
static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class);
private static final ThreadLocal<WebSocketAsyncConnection> CURRENT_CONNECTION = new ThreadLocal<WebSocketAsyncConnection>();
@ -68,6 +68,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
private final Parser parser;
private final WebSocketPolicy policy;
private final FrameQueue queue;
private WebSocketSession session;
private List<ExtensionConfig> extensions;
private boolean flushing;
private AtomicLong writes;
@ -202,19 +203,17 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
return scheduler;
}
public WebSocketSession getSession()
{
return session;
}
@Override
public boolean isOpen()
{
return getEndPoint().isOpen();
}
@Override
public void onClose()
{
LOG.debug("onClose()");
super.onClose();
}
@Override
public void onFillable()
{
@ -233,14 +232,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
}
}
@Override
public void onOpen()
{
super.onOpen();
// TODO: websocket.setConnection(this);
// TODO: websocket.onConnect();
}
/**
* Enqueue internal frame from {@link OutgoingFrames} stack for eventual write out on the physical connection.
*/
@ -317,6 +308,11 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
this.extensions = extensions;
}
public void setSession(WebSocketSession session)
{
this.session = session;
}
/**
* For terminating connections forcefully.
*

View File

@ -93,6 +93,13 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
return connection.isOpen();
}
public void onConnect()
{
LOG.debug("onConnect()");
websocket.setSession(this);
websocket.onConnect();
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
@ -120,6 +127,22 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
this.outgoing = outgoing;
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("WebSocketSession[websocket=");
builder.append(websocket);
builder.append(",connection=");
builder.append(connection);
builder.append(",subprotocol=");
builder.append(subprotocol);
builder.append(",outgoing=");
builder.append(outgoing);
builder.append("]");
return builder.toString();
}
/**
* {@inheritDoc}
*/

View File

@ -58,7 +58,7 @@ public class WebSocketEventDriverTest
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn);
driver.setSession(conn);
driver.onConnect();
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
@ -74,7 +74,7 @@ public class WebSocketEventDriverTest
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn);
driver.setSession(conn);
driver.onConnect();
driver.incoming(makeBinaryFrame("Hello World",true));
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
@ -92,7 +92,7 @@ public class WebSocketEventDriverTest
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn);
driver.setSession(conn);
driver.onConnect();
driver.incoming(new WebSocketFrame(OpCode.PING).setPayload("PING"));
driver.incoming(WebSocketFrame.text("Text Me"));
@ -115,7 +115,7 @@ public class WebSocketEventDriverTest
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn);
driver.setSession(conn);
driver.onConnect();
driver.incoming(makeBinaryFrame("Hello World",true));
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
@ -133,7 +133,7 @@ public class WebSocketEventDriverTest
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn);
driver.setSession(conn);
driver.onConnect();
driver.incoming(WebSocketFrame.text("Hello World"));
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());

View File

@ -0,0 +1,41 @@
package org.eclipse.jetty.websocket.server;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
public class WebSocketServerAsyncConnection extends WebSocketAsyncConnection
{
private final WebSocketServerFactory factory;
private boolean connected;
public WebSocketServerAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy,
ByteBufferPool bufferPool, WebSocketServerFactory factory)
{
super(endp,executor,scheduler,policy,bufferPool);
this.factory = factory;
this.connected = false;
}
@Override
public void onClose()
{
super.onClose();
factory.sessionClosed(getSession());
}
@Override
public void onOpen()
{
if (!connected)
{
factory.sessionOpened(getSession());
connected = true;
}
super.onOpen();
}
}

View File

@ -50,7 +50,6 @@ import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
@ -62,7 +61,6 @@ import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
public class WebSocketServerFactory extends AbstractLifeCycle implements WebSocketCreator
{
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private final Queue<WebSocketAsyncConnection> connections = new ConcurrentLinkedQueue<WebSocketAsyncConnection>();
private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
{
@ -70,6 +68,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
handshakes.put(HandshakeHixie76.VERSION,new HandshakeHixie76());
}
private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
/**
* Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
*/
@ -142,17 +141,20 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
return upgrade(sockreq,sockresp,websocket);
}
protected boolean addConnection(WebSocketAsyncConnection connection)
{
return isRunning() && connections.add(connection);
}
protected void closeConnections()
{
for (WebSocketAsyncConnection connection : connections)
for (WebSocketSession session : sessions)
{
connection.getEndPoint().close();
try
{
session.close();
}
catch (IOException e)
{
LOG.warn("Unable to close session",e);
}
}
sessions.clear();
}
@Override
@ -182,6 +184,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
protected void doStop() throws Exception
{
closeConnections();
super.doStop();
}
public WebSocketCreator getCreator()
@ -278,9 +281,25 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
methodsCache.register(websocketClass);
}
protected boolean removeConnection(WebSocketAsyncConnection connection)
public boolean sessionClosed(WebSocketSession session)
{
return connections.remove(connection);
return isRunning() && sessions.remove(session);
}
public boolean sessionOpened(WebSocketSession session)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Session Opened: {}",session);
}
if (!isRunning())
{
LOG.warn("Factory is not running");
return false;
}
boolean ret = sessions.offer(session);
session.onConnect();
return ret;
}
public void setCreator(WebSocketCreator creator)
@ -337,7 +356,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
AsyncEndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().findExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
WebSocketAsyncConnection connection = new WebSocketAsyncConnection(endp,executor,scheduler,websocket.getPolicy(),bufferPool);
WebSocketServerAsyncConnection connection = new WebSocketServerAsyncConnection(endp,executor,scheduler,websocket.getPolicy(),bufferPool,this);
// Tell jetty about the new connection
request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTR,connection);
@ -346,6 +365,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
// Initialize / Negotiate Extensions
WebSocketSession session = new WebSocketSession(websocket,connection,getPolicy(),response.getAcceptedSubProtocol());
connection.setSession(session);
List<Extension> extensions = initExtensions(request.getExtensions());
// Start with default routing.
@ -399,14 +419,6 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
LOG.debug("Handshake Response: {}",handshaker);
handshaker.doHandshakeResponse(request,response);
// Add connection
addConnection(connection);
// Notify POJO of connection
// TODO move to WebSocketAsyncConnection.onOpen
websocket.setConnection(session);
websocket.onConnect();
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
return true;
}

View File

@ -16,6 +16,7 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -34,14 +35,14 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
* appropriate conditions.
* <p>
* The most basic implementation would be as follows.
*
*
* <pre>
* package my.example;
*
*
* import javax.servlet.http.HttpServletRequest;
* import org.eclipse.jetty.websocket.WebSocket;
* import org.eclipse.jetty.websocket.server.WebSocketServlet;
*
*
* public class MyEchoServlet extends WebSocketServlet
* {
* &#064;Override
@ -51,29 +52,29 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
* }
* }
* </pre>
*
*
* Note: that only request that conforms to a "WebSocket: Upgrade" handshake request will trigger the {@link WebSocketServerFactory} handling of creating
* WebSockets.<br>
* All other requests are treated as normal servlet requests.
*
*
* <p>
* <b>Configuration / Init-Parameters:</b><br>
* Note: If you use the {@link WebSocket &#064;WebSocket} annotation, these configuration settings can be specified on a per WebSocket basis, vs a per Servlet
* basis.
*
*
* <dl>
* <dt>bufferSize</dt>
* <dd>can be used to set the buffer size, which is also the max frame byte size<br>
* <i>Default: 8192</i></dd>
*
*
* <dt>maxIdleTime</dt>
* <dd>set the time in ms that a websocket may be idle before closing<br>
* <i>Default:</i></dd>
*
*
* <dt>maxTextMessagesSize</dt>
* <dd>set the size in characters that a websocket may be accept before closing<br>
* <i>Default:</i></dd>
*
*
* <dt>maxBinaryMessagesSize</dt>
* <dd>set the size in bytes that a websocket may be accept before closing<br>
* <i>Default:</i></dd>
@ -134,6 +135,8 @@ public abstract class WebSocketServlet extends HttpServlet
webSocketFactory = new WebSocketServerFactory(policy);
registerWebSockets(webSocketFactory);
webSocketFactory.start();
}
catch (Exception x)
{

View File

@ -115,7 +115,7 @@ public class TestABCase1 extends AbstractABCase
client.flush();
// Read frames
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,500);
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,1000);
// Validate echo'd frame
WebSocketFrame frame = frames.remove();