478829 - WebsocketSession not cleaned up / memory leak

+ Reducing looping references Session -> otherObj -> Session
 + Using Container LifeCycle bean management more consistently
 + All sessions are now child beans
 + A stopped session that hasn't been closed, will auto-close now
 + Using SessionListener more consistently
 + Client ConnectionManager no longer tracks Sessions
 + EventDriver stop cleans up its Session references
 + Moving all DummyConnection test classes to websocket-common:tests
This commit is contained in:
Joakim Erdfelt 2015-10-01 18:18:20 -07:00
parent f4133dfd35
commit 1e8d0db743
17 changed files with 86 additions and 281 deletions

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.websocket.jsr356;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.nio.ByteBuffer;
@ -29,6 +29,7 @@ import javax.websocket.MessageHandler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.jsr356.client.SimpleEndpointMetadata;
import org.eclipse.jetty.websocket.jsr356.endpoints.EndpointInstance;
@ -37,7 +38,6 @@ import org.eclipse.jetty.websocket.jsr356.handlers.ByteArrayWholeHandler;
import org.eclipse.jetty.websocket.jsr356.handlers.ByteBufferPartialHandler;
import org.eclipse.jetty.websocket.jsr356.handlers.LongMessageHandler;
import org.eclipse.jetty.websocket.jsr356.handlers.StringWholeHandler;
import org.eclipse.jetty.websocket.jsr356.samples.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.samples.DummyEndpoint;
import org.junit.Assert;
import org.junit.Before;

View File

@ -1,156 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.samples;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState;
public class DummyConnection implements LogicalConnection
{
private IOState iostate;
public DummyConnection()
{
this.iostate = new IOState();
}
@Override
public void close()
{
}
@Override
public void close(int statusCode, String reason)
{
}
@Override
public void disconnect()
{
}
@Override
public ByteBufferPool getBufferPool()
{
return null;
}
@Override
public Executor getExecutor()
{
return null;
}
@Override
public long getIdleTimeout()
{
return 0;
}
@Override
public IOState getIOState()
{
return this.iostate;
}
@Override
public InetSocketAddress getLocalAddress()
{
return null;
}
@Override
public long getMaxIdleTimeout()
{
return 0;
}
@Override
public WebSocketPolicy getPolicy()
{
return null;
}
@Override
public InetSocketAddress getRemoteAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public WebSocketSession getSession()
{
return null;
}
@Override
public boolean isOpen()
{
return false;
}
@Override
public boolean isReading()
{
return false;
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
}
@Override
public void resume()
{
}
@Override
public void setMaxIdleTimeout(long ms)
{
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{
}
@Override
public void setSession(WebSocketSession session)
{
}
@Override
public SuspendToken suspend()
{
return null;
}
}

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.annotations.AnnotatedEndpointScanner;

View File

@ -499,6 +499,7 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
{
if (LOG.isDebugEnabled())
LOG.debug("Session Opened: {}",session);
addManaged(session);
}
public void setAsyncWriteTimeout(long ms)

View File

@ -23,19 +23,13 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
/**
@ -136,7 +130,6 @@ public class ConnectionManager extends ContainerLifeCycle
return new InetSocketAddress(uri.getHost(),port);
}
private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
private final WebSocketClient client;
private WebSocketClientSelectorManager selector;
@ -145,31 +138,6 @@ public class ConnectionManager extends ContainerLifeCycle
this.client = client;
}
public void addSession(WebSocketSession session)
{
sessions.add(session);
}
private void shutdownAllConnections()
{
for (WebSocketSession session : sessions)
{
if (session.getConnection() != null)
{
try
{
session.getConnection().close(
StatusCode.SHUTDOWN,
"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Shutdown All Connections",t);
}
}
}
}
public ConnectPromise connect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
{
return new PhysicalConnect(client,driver,request);
@ -189,8 +157,6 @@ public class ConnectionManager extends ContainerLifeCycle
@Override
protected void doStop() throws Exception
{
shutdownAllConnections();
sessions.clear();
super.doStop();
removeBean(selector);
}
@ -200,11 +166,6 @@ public class ConnectionManager extends ContainerLifeCycle
return selector;
}
public Collection<WebSocketSession> getSessions()
{
return Collections.unmodifiableCollection(sessions);
}
/**
* Factory method for new WebSocketClientSelectorManager (used by other projects like cometd)
*
@ -216,9 +177,4 @@ public class ConnectionManager extends ContainerLifeCycle
{
return new WebSocketClientSelectorManager(client);
}
public void removeSession(WebSocketSession session)
{
sessions.remove(session);
}
}

View File

@ -334,7 +334,7 @@ public class UpgradeConnection extends AbstractConnection implements Connection.
session.setOutgoingHandler(extensionStack);
extensionStack.setNextOutgoing(connection);
session.addBean(extensionStack);
session.addManaged(extensionStack);
connectPromise.getClient().addManaged(session);
// Now swap out the connection

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
@ -41,12 +42,14 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
private final ConnectPromise connectPromise;
private final Masker masker;
private final AtomicBoolean opened = new AtomicBoolean(false);
private final SessionListener sessionListener;
public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise, WebSocketPolicy policy)
{
super(endp,executor,connectPromise.getClient().getScheduler(),policy,connectPromise.getClient().getBufferPool());
this.connectPromise = connectPromise;
this.masker = connectPromise.getMasker();
this.sessionListener = connectPromise.getClient();
assert (this.masker != null);
}
@ -66,8 +69,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
public void onClose()
{
super.onClose();
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.removeSession(getSession());
sessionListener.onSessionClosed(getSession());
}
@Override
@ -77,8 +79,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
if (!beenOpened)
{
WebSocketSession session = getSession();
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.addSession(session);
sessionListener.onSessionOpened(session);
connectPromise.succeeded(session);
}
super.onOpen();

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -81,7 +82,8 @@ public class SessionTest
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
Collection<WebSocketSession> sessions = client.getBeans(WebSocketSession.class);
Assert.assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");

View File

@ -18,14 +18,12 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@ -37,6 +35,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
@ -118,7 +117,8 @@ public class WebSocketClientTest
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
Collection<WebSocketSession> sessions = client.getBeans(WebSocketSession.class);
Assert.assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
@ -164,7 +164,8 @@ public class WebSocketClientTest
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
Collection<WebSocketSession> sessions = client.getBeans(WebSocketSession.class);
Assert.assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
FutureWriteCallback callback = new FutureWriteCallback();

View File

@ -63,8 +63,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private final WebSocketContainerScope containerScope;
private final URI requestURI;
private final EventDriver websocket;
private final LogicalConnection connection;
private final EventDriver websocket;
private final SessionListener[] sessionListeners;
private final Executor executor;
private ClassLoader classLoader;
@ -93,6 +93,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
this.outgoingHandler = connection;
this.incomingHandler = websocket;
this.connection.getIOState().addListener(this);
addBean(this.connection);
addBean(this.websocket);
}
@Override
@ -130,6 +133,35 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
executor.execute(runnable);
}
@Override
protected void doStart() throws Exception
{
if(LOG.isDebugEnabled())
LOG.debug("starting - {}",this);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
if(LOG.isDebugEnabled())
LOG.debug("stopping - {}",this);
if (getConnection() != null)
{
try
{
getConnection().close(StatusCode.SHUTDOWN,"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Connection Shutdown",t);
}
}
super.doStop();
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
@ -381,6 +413,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
switch (state)
{
case CLOSED:
IOState ioState = this.connection.getIOState();
CloseInfo close = ioState.getCloseInfo();
// confirmed close of local endpoint
notifyClose(close.getStatusCode(),close.getReason());
// notify session listeners
for (SessionListener listener : sessionListeners)
{
@ -395,10 +432,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
LOG.ignore(t);
}
}
IOState ioState = this.connection.getIOState();
CloseInfo close = ioState.getCloseInfo();
// confirmed close of local endpoint
notifyClose(close.getStatusCode(),close.getReason());
break;
case CONNECTED:
// notify session listeners
@ -418,7 +451,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
break;
}
}
/**
* Open/Activate the session
*/
@ -467,7 +500,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
close(statusCode,t.getMessage());
}
}
public void setExtensionFactory(ExtensionFactory extensionFactory)
{
this.extensionFactory = extensionFactory;

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
@ -40,11 +41,11 @@ import org.eclipse.jetty.websocket.common.message.MessageAppender;
/**
* EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
*/
public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
public abstract class AbstractEventDriver extends AbstractLifeCycle implements IncomingFrames, EventDriver
{
private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
protected final Logger TARGET_LOG;
protected final WebSocketPolicy policy;
protected WebSocketPolicy policy;
protected final Object websocket;
protected WebSocketSession session;
protected MessageAppender activeMessage;
@ -233,6 +234,12 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
throw t;
}
}
@Override
protected void doStop() throws Exception
{
session = null;
}
protected void terminateConnection(int statusCode, String rawreason)
{

View File

@ -55,7 +55,8 @@ public class CallableMethod
if (obj == null)
{
LOG.warn("Cannot call {} on null object",this.method);
String err = String.format("Cannot call %s on null object", this.method);
LOG.warn(new RuntimeException(err));
return null;
}

View File

@ -200,7 +200,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".close");
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_close");
/**
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
@ -214,7 +214,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final WebSocketPolicy policy;
private final AtomicBoolean suspendToken;
private final FrameFlusher flusher;
private WebSocketSession session;
private WebSocketSession session; // TODO: Should not be part of Connection
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer prefillBuffer;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
package org.eclipse.jetty.websocket.common.test;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;

View File

@ -22,13 +22,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import javax.servlet.ServletContext;
@ -53,7 +52,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@ -94,7 +92,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
private final WebSocketExtensionFactory extensionFactory;
private Executor executor;
private List<SessionFactory> sessionFactories;
private Set<WebSocketSession> openSessions = new CopyOnWriteArraySet<>();
private WebSocketCreator creator;
private List<Class<?>> registeredSocketClasses;
private DecoratedObjectFactory objectFactory;
@ -228,27 +225,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
}
}
protected void shutdownAllConnections()
{
for (WebSocketSession session : openSessions)
{
if (session.getConnection() != null)
{
try
{
session.getConnection().close(
StatusCode.SHUTDOWN,
"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Shutdown All Connections",t);
}
}
}
openSessions.clear();
}
@Override
public WebSocketServletFactory createFactory(WebSocketPolicy policy)
{
@ -318,13 +294,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
super.doStart();
}
@Override
protected void doStop() throws Exception
{
shutdownAllConnections();
super.doStop();
}
@Override
public ByteBufferPool getBufferPool()
{
@ -359,9 +328,9 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
return extensionFactory;
}
public Set<WebSocketSession> getOpenSessions()
public Collection<WebSocketSession> getOpenSessions()
{
return Collections.unmodifiableSet(this.openSessions);
return getBeans(WebSocketSession.class);
}
@Override
@ -484,13 +453,13 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
@Override
public void onSessionClosed(WebSocketSession session)
{
this.openSessions.remove(session);
removeBean(session);
}
@Override
public void onSessionOpened(WebSocketSession session)
{
this.openSessions.add(session);
addManaged(session);
}
protected String[] parseProtocols(String protocol)
@ -636,24 +605,13 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
extensionStack.setNextOutgoing(wsConnection);
// Start Components
session.addBean(extensionStack);
this.addBean(session);
session.addManaged(extensionStack);
this.addManaged(session);
if (session.isFailed())
{
throw new IOException("Session failed to start");
}
else if (!session.isRunning())
{
try
{
session.start();
}
catch (Exception e)
{
throw new IOException("Unable to start Session",e);
}
}
// Tell jetty about the new upgraded connection
request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, wsConnection);

View File

@ -22,8 +22,8 @@ import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -144,7 +144,7 @@ public class ManyConnectionsCleanupTest
calls.incrementAndGet();
if (message.equalsIgnoreCase("openSessions"))
{
Set<WebSocketSession> sessions = container.getOpenSessions();
Collection<WebSocketSession> sessions = container.getOpenSessions();
StringBuilder ret = new StringBuilder();
ret.append("openSessions.size=").append(sessions.size()).append('\n');
@ -336,7 +336,7 @@ public class ManyConnectionsCleanupTest
client.sendStandardRequest();
client.expectUpgradeResponse();
client.readFrames(1,1,TimeUnit.SECONDS);
// client.readFrames(1,2,TimeUnit.SECONDS);
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
client.write(close.asFrame()); // respond with close

View File

@ -22,8 +22,8 @@ import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -139,7 +139,7 @@ public class WebSocketCloseTest
LOG.debug("onWebSocketText({})",message);
if (message.equalsIgnoreCase("openSessions"))
{
Set<WebSocketSession> sessions = container.getOpenSessions();
Collection<WebSocketSession> sessions = container.getOpenSessions();
StringBuilder ret = new StringBuilder();
ret.append("openSessions.size=").append(sessions.size()).append('\n');