481986 - Dead JSR 356 Server Session still being tracked after Session/Connection closure

+ Removing SessionListener
+ Work in CDI layer for WebSocketContainerScope is reused for
  session tracking on the parent scope of the WebSocketSession only.
  no more multi-listener behavior
+ Reworked JsrSession ID behavior to not be based on AtomicLong
+ AbstractWebSocketConnection now has .hashCode and .equals
This commit is contained in:
Joakim Erdfelt 2016-01-12 16:15:51 -07:00
parent 75b84374e0
commit 57224ec3ca
18 changed files with 328 additions and 103 deletions

View File

@ -53,8 +53,6 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -73,7 +71,7 @@ import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata;
* <p>
* This should be specific to a JVM if run in a standalone mode. or specific to a WebAppContext if running on the Jetty server.
*/
public class ClientContainer extends ContainerLifeCycle implements WebSocketContainer, WebSocketContainerScope, SessionListener
public class ClientContainer extends ContainerLifeCycle implements WebSocketContainer, WebSocketContainerScope
{
private static final Logger LOG = Log.getLogger(ClientContainer.class);
@ -105,8 +103,7 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
this.scopeDelegate = scope;
client = new WebSocketClient(scope, new SslContextFactory(trustAll));
client.setEventDriverFactory(new JsrEventDriverFactory(client.getPolicy()));
SessionFactory sessionFactory = new JsrSessionFactory(this,this,client);
client.setSessionFactory(sessionFactory);
client.setSessionFactory(new JsrSessionFactory(this));
addBean(client);
this.endpointClientMetadataCache = new ConcurrentHashMap<>();

View File

@ -44,7 +44,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.endpoints.AbstractJsrEventDriver;
@ -74,9 +73,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote;
public JsrSession(ClientContainer container, String id, URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners)
public JsrSession(ClientContainer container, String id, URI requestURI, EventDriver websocket, LogicalConnection connection)
{
super(container, requestURI, websocket, connection, sessionListeners);
super(container, requestURI, websocket, connection);
if (!(websocket instanceof AbstractJsrEventDriver))
{
throw new IllegalArgumentException("Cannot use, not a JSR WebSocket: " + websocket);

View File

@ -19,36 +19,32 @@
package org.eclipse.jetty.websocket.jsr356;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.endpoints.AbstractJsrEventDriver;
public class JsrSessionFactory implements SessionFactory
{
private AtomicLong idgen = new AtomicLong(0);
private static final Logger LOG = Log.getLogger(JsrSessionFactory.class);
private final ClientContainer container;
private final SessionListener[] listeners;
public JsrSessionFactory(ClientContainer container, SessionListener... sessionListeners)
public JsrSessionFactory(ClientContainer container)
{
if(LOG.isDebugEnabled()) {
LOG.debug("Container: {}", container);
}
this.container = container;
this.listeners = sessionListeners;
}
@Override
public WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
{
return new JsrSession(container,getNextId(),requestURI,websocket,connection,listeners);
}
public String getNextId()
{
return String.format("websocket-%d",idgen.incrementAndGet());
return new JsrSession(container,connection.getId(),requestURI,websocket,connection);
}
@Override

View File

@ -30,6 +30,7 @@ import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.JsrSessionFactory;
@ -56,7 +57,7 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
EventDriverFactory eventDriverFactory = this.webSocketServerFactory.getEventDriverFactory();
eventDriverFactory.addImplementation(new JsrServerEndpointImpl());
eventDriverFactory.addImplementation(new JsrServerExtendsEndpointImpl());
this.webSocketServerFactory.addSessionFactory(new JsrSessionFactory(this,this));
this.webSocketServerFactory.addSessionFactory(new JsrSessionFactory(this));
addBean(webSocketServerFactory);
}
@ -240,4 +241,16 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
// incoming streaming buffer size
webSocketServerFactory.getPolicy().setMaxTextMessageBufferSize(max);
}
@Override
public void onSessionClosed(WebSocketSession session)
{
webSocketServerFactory.onSessionClosed(session);
}
@Override
public void onSessionOpened(WebSocketSession session)
{
webSocketServerFactory.onSessionOpened(session);
}
}

View File

@ -0,0 +1,183 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.net.URI;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class SessionTrackingTest
{
public static class ClientSocket extends Endpoint
{
public Session session;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
@Override
public void onOpen(Session session, EndpointConfig config)
{
this.session = session;
openLatch.countDown();
}
@Override
public void onClose(Session session, CloseReason closeReason)
{
closeLatch.countDown();
}
public void waitForOpen(long timeout, TimeUnit unit) throws InterruptedException
{
assertThat("ClientSocket opened",openLatch.await(timeout,unit),is(true));
}
public void waitForClose(long timeout, TimeUnit unit) throws InterruptedException
{
assertThat("ClientSocket opened",closeLatch.await(timeout,unit),is(true));
}
}
@ServerEndpoint("/test")
public static class EchoSocket
{
@OnMessage
public String echo(String msg)
{
return msg;
}
}
private static Server server;
private static WebSocketServerFactory wsServerFactory;
private static URI serverURI;
@BeforeClass
public static void startServer() throws Exception
{
Server server = new Server();
ServerConnector serverConnector = new ServerConnector(server);
serverConnector.setPort(0);
server.addConnector(serverConnector);
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath("/");
server.setHandler(servletContextHandler);
ServerContainer serverContainer = WebSocketServerContainerInitializer.configureContext(servletContextHandler);
serverContainer.addEndpoint(EchoSocket.class);
wsServerFactory = serverContainer.getBean(WebSocketServerFactory.class);
server.start();
String host = serverConnector.getHost();
if (StringUtil.isBlank(host))
{
host = "localhost";
}
serverURI = new URI("ws://" + host + ":" + serverConnector.getLocalPort());
}
@AfterClass
public static void stopServer() throws Exception
{
if (server == null)
{
return;
}
server.stop();
}
@Test
public void testAddRemoveSessions() throws Exception
{
// Create Client
ClientContainer clientContainer = new ClientContainer();
try
{
clientContainer.start();
// Establish connections
ClientSocket cli1 = new ClientSocket();
clientContainer.connectToServer(cli1,serverURI.resolve("/test"));
cli1.waitForOpen(1,TimeUnit.SECONDS);
// Assert open connections
assertServerOpenConnectionCount(1);
// Establish new connection
ClientSocket cli2 = new ClientSocket();
clientContainer.connectToServer(cli2,serverURI.resolve("/test"));
cli2.waitForOpen(1,TimeUnit.SECONDS);
// Assert open connections
assertServerOpenConnectionCount(2);
// Establish close both connections
cli1.session.close();
cli2.session.close();
cli1.waitForClose(1,TimeUnit.SECONDS);
cli2.waitForClose(1,TimeUnit.SECONDS);
// Assert open connections
assertServerOpenConnectionCount(0);
}
finally
{
clientContainer.stop();
}
}
private void assertServerOpenConnectionCount(int expectedCount)
{
Collection<WebSocketSession> sessions = wsServerFactory.getBeans(WebSocketSession.class);
int openCount = 0;
for (WebSocketSession session : sessions)
{
assertThat("Session.isopen: " + session,session.isOpen(),is(true));
openCount++;
}
assertThat("Open Session Count",openCount,is(expectedCount));
}
}

View File

@ -6,6 +6,9 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
org.eclipse.jetty.websocket.jsr356.LEVEL=DEBUG
### Show state changes on BrowserDebugTool
# -- LEAVE THIS AT DEBUG LEVEL --
org.eclipse.jetty.websocket.jsr356.server.browser.LEVEL=DEBUG

View File

@ -53,7 +53,6 @@ import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriver;
@ -64,7 +63,7 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
/**
* WebSocketClient provides a means of establishing connections to remote websocket endpoints.
*/
public class WebSocketClient extends ContainerLifeCycle implements SessionListener, WebSocketContainerScope
public class WebSocketClient extends ContainerLifeCycle implements WebSocketContainerScope
{
private static final Logger LOG = Log.getLogger(WebSocketClient.class);

View File

@ -22,7 +22,6 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -155,4 +154,10 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* @return the suspend token
*/
SuspendToken suspend();
/**
* Get Unique ID for the Connection
* @return the unique ID for the connection
*/
public String getId();
}

View File

@ -1,31 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
/**
* Basic listener interface for Session open/close.
* <p>
* Used primarily for tracking open sessions.
*/
public interface SessionListener
{
public void onSessionOpened(WebSocketSession session);
public void onSessionClosed(WebSocketSession session);
}

View File

@ -67,7 +67,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
private final URI requestURI;
private final LogicalConnection connection;
private final EventDriver websocket;
private final SessionListener[] sessionListeners;
private final Executor executor;
private ClassLoader classLoader;
private ExtensionFactory extensionFactory;
@ -80,7 +79,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners)
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection)
{
Objects.requireNonNull(containerScope,"Container Scope cannot be null");
Objects.requireNonNull(requestURI,"Request URI cannot be null");
@ -90,7 +89,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
this.requestURI = requestURI;
this.websocket = websocket;
this.connection = connection;
this.sessionListeners = sessionListeners;
this.executor = connection.getExecutor();
this.outgoingHandler = connection;
this.incomingHandler = websocket;
@ -435,36 +433,28 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
CloseInfo close = ioState.getCloseInfo();
// confirmed close of local endpoint
notifyClose(close.getStatusCode(),close.getReason());
// notify session listeners
for (SessionListener listener : sessionListeners)
try
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("{}.onSessionClosed()",listener.getClass().getSimpleName());
listener.onSessionClosed(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
if (LOG.isDebugEnabled())
LOG.debug("{}.onSessionClosed()",containerScope.getClass().getSimpleName());
containerScope.onSessionClosed(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
break;
case CONNECTED:
// notify session listeners
for (SessionListener listener : sessionListeners)
try
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("{}.onSessionOpen()", listener.getClass().getSimpleName());
listener.onSessionOpened(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
if (LOG.isDebugEnabled())
LOG.debug("{}.onSessionOpened()",containerScope.getClass().getSimpleName());
containerScope.onSessionOpened(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
break;
}

View File

@ -31,26 +31,10 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
public class WebSocketSessionFactory implements SessionFactory
{
private final WebSocketContainerScope containerScope;
private final SessionListener[] listeners;
public WebSocketSessionFactory(WebSocketContainerScope containerScope, SessionListener... sessionListeners)
public WebSocketSessionFactory(WebSocketContainerScope containerScope)
{
this.containerScope = containerScope;
if ((sessionListeners != null) && (sessionListeners.length > 0))
{
this.listeners = sessionListeners;
}
else
{
if (this.containerScope instanceof SessionListener)
{
this.listeners = new SessionListener[] { (SessionListener)containerScope };
}
else
{
this.listeners = new SessionListener[0];
}
}
}
@Override
@ -62,6 +46,6 @@ public class WebSocketSessionFactory implements SessionFactory
@Override
public WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
{
return new WebSocketSession(containerScope, requestURI,websocket,connection,listeners);
return new WebSocketSession(containerScope, requestURI,websocket,connection);
}
}

View File

@ -214,6 +214,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final WebSocketPolicy policy;
private final AtomicBoolean suspendToken;
private final FrameFlusher flusher;
private final String id;
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer prefillBuffer;
@ -224,6 +225,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor);
this.id = String.format("%s:%d->%s:%d",
endp.getLocalAddress().getAddress().getHostAddress(),
endp.getLocalAddress().getPort(),
endp.getRemoteAddress().getAddress().getHostAddress(),
endp.getRemoteAddress().getPort());
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);
@ -347,6 +353,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
return generator;
}
@Override
public String getId()
{
return id;
}
@Override
public long getIdleTimeout()
@ -747,6 +759,43 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
EndPoint endp = getEndPoint();
if(endp != null)
{
result = prime * result + endp.getLocalAddress().hashCode();
result = prime * result + endp.getRemoteAddress().hashCode();
}
return result;
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AbstractWebSocketConnection other = (AbstractWebSocketConnection)obj;
EndPoint endp = getEndPoint();
EndPoint otherEndp = other.getEndPoint();
if (endp == null)
{
if (otherEndp != null)
return false;
}
else if (!endp.equals(otherEndp))
return false;
return true;
}
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.WebSocketSession;
public class SimpleContainerScope extends ContainerLifeCycle implements WebSocketContainerScope
{
@ -105,4 +106,14 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
{
this.sslContextFactory = sslContextFactory;
}
@Override
public void onSessionOpened(WebSocketSession session)
{
}
@Override
public void onSessionClosed(WebSocketSession session)
{
}
}

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.WebSocketSession;
/**
* Defined Scope for a WebSocketContainer.
@ -64,4 +65,19 @@ public interface WebSocketContainerScope
* @return the SslContextFactory in use by the container (can be null if no SSL context is defined)
*/
public SslContextFactory getSslContextFactory();
/**
* A Session has been opened
*
* @param the session that was opened
*/
public void onSessionOpened(WebSocketSession session);
/**
* A Session has been closed
*
* @param the session that was closed
*/
public void onSessionClosed(WebSocketSession session);
}

View File

@ -107,6 +107,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
return this.bufferPool;
}
@Override
public String getId()
{
return this.id;
}
@Override
public long getIdleTimeout()
{

View File

@ -70,6 +70,12 @@ public class DummyConnection implements LogicalConnection
return null;
}
@Override
public String getId()
{
return "dummy";
}
@Override
public long getIdleTimeout()
{

View File

@ -39,7 +39,7 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection imple
endp.setIdleTimeout(policy.getIdleTimeout());
}
}
@Override
public InetSocketAddress getLocalAddress()
{

View File

@ -58,7 +58,6 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.util.QuoteUtil;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriver;
@ -75,7 +74,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
/**
* Factory to create WebSocket connections
*/
public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketContainerScope, WebSocketServletFactory, SessionListener
public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketContainerScope, WebSocketServletFactory
{
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);