422192 - ClientContainer.getOpenSessions() always returns null

+ Adding SessionListener and using it judiciously
This commit is contained in:
Joakim Erdfelt 2013-12-04 12:23:44 -07:00
parent 13b07c10aa
commit 1eac013485
17 changed files with 342 additions and 71 deletions

View File

@ -20,11 +20,13 @@ package org.eclipse.jetty.websocket.jsr356;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -38,12 +40,16 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.jsr356.annotations.AnnotatedEndpointScanner;
import org.eclipse.jetty.websocket.jsr356.client.AnnotatedClientEndpointMetadata;
import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig;
@ -59,8 +65,10 @@ import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata;
* <p>
* This should be specific to a JVM if run in a standalone mode. or specific to a WebAppContext if running on the Jetty server.
*/
public class ClientContainer extends ContainerLifeCycle implements WebSocketContainer
public class ClientContainer extends ContainerLifeCycle implements WebSocketContainer, SessionListener
{
private static final Logger LOG = Log.getLogger(ClientContainer.class);
/** Tracking all primitive decoders for the container */
private final DecoderFactory decoderFactory;
/** Tracking all primitive encoders for the container */
@ -68,6 +76,8 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
/** Tracking for all declared Client endpoints */
private final Map<Class<?>, EndpointMetadata> endpointClientMetadataCache;
/** Tracking for all open Sessions */
private Set<Session> openSessions = new CopyOnWriteArraySet<>();
/** The jetty websocket client in use for this container */
private WebSocketClient client;
@ -88,7 +98,7 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
client = new WebSocketClient(executor);
client.setEventDriverFactory(new JsrEventDriverFactory(client.getPolicy()));
client.setSessionFactory(new JsrSessionFactory(this));
client.setSessionFactory(new JsrSessionFactory(this,this));
addBean(client);
}
@ -287,8 +297,7 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
*/
public Set<Session> getOpenSessions()
{
// TODO Auto-generated method stub
return null;
return Collections.unmodifiableSet(this.openSessions);
}
private EndpointInstance newClientEndpointInstance(Class<?> endpointClass, ClientEndpointConfig config)
@ -321,6 +330,34 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
return new EndpointInstance(endpoint,cec,metadata);
}
@Override
public void onSessionClosed(WebSocketSession session)
{
if (session instanceof Session)
{
this.openSessions.remove((Session)session);
}
else
{
LOG.warn("JSR356 Implementation should not be mixed with native implementation: Expected {} to implement {}",session.getClass().getName(),
Session.class.getName());
}
}
@Override
public void onSessionOpened(WebSocketSession session)
{
if (session instanceof Session)
{
this.openSessions.add((Session)session);
}
else
{
LOG.warn("JSR356 Implementation should not be mixed with native implementation: Expected {} to implement {}",session.getClass().getName(),
Session.class.getName());
}
}
@Override
public void setAsyncSendTimeout(long ms)
{

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.endpoints.AbstractJsrEventDriver;
@ -72,9 +73,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote;
public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id)
public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener[] sessionListeners)
{
super(requestURI,websocket,connection);
super(requestURI,websocket,connection,sessionListeners);
if (!(websocket instanceof AbstractJsrEventDriver))
{
throw new IllegalArgumentException("Cannot use, not a JSR WebSocket: " + websocket);

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.endpoints.AbstractJsrEventDriver;
@ -31,16 +32,18 @@ public class JsrSessionFactory implements SessionFactory
{
private AtomicLong idgen = new AtomicLong(0);
private final ClientContainer container;
private final SessionListener[] listeners;
public JsrSessionFactory(ClientContainer container)
public JsrSessionFactory(ClientContainer container, SessionListener... sessionListeners)
{
this.container = container;
this.listeners = sessionListeners;
}
@Override
public WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
{
return new JsrSession(requestURI,websocket,connection,container,getNextId());
return new JsrSession(requestURI,websocket,connection,container,getNextId(),listeners);
}
public String getNextId()

View File

@ -28,6 +28,7 @@ import javax.websocket.DeploymentException;
import javax.websocket.MessageHandler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.jsr356.client.SimpleEndpointMetadata;
@ -64,7 +65,7 @@ public class JsrSessionTest
EventDriver driver = new JsrEndpointEventDriver(policy,ei);
DummyConnection connection = new DummyConnection();
session = new JsrSession(requestURI,driver,connection,container,id);
session = new JsrSession(requestURI,driver,connection,container,id,new SessionListener[0]);
}
@Test

View File

@ -52,7 +52,7 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
EventDriverFactory eventDriverFactory = this.webSocketServerFactory.getEventDriverFactory();
eventDriverFactory.addImplementation(new JsrServerEndpointImpl());
eventDriverFactory.addImplementation(new JsrServerExtendsEndpointImpl());
this.webSocketServerFactory.addSessionFactory(new JsrSessionFactory(this));
this.webSocketServerFactory.addSessionFactory(new JsrSessionFactory(this,this));
}
public EndpointInstance newClientEndpointInstance(Object endpoint, ServerEndpointConfig config, String path)

View File

@ -18,8 +18,7 @@
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.ArrayList;
@ -29,6 +28,7 @@ import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
@ -80,7 +80,7 @@ public class OnPartialTest
DummyConnection connection = new DummyConnection();
ClientContainer container = new ClientContainer();
@SuppressWarnings("resource")
JsrSession session = new JsrSession(requestURI,driver,connection,container,id);
JsrSession session = new JsrSession(requestURI,driver,connection,container,id,new SessionListener[0]);
session.setPolicy(policy);
session.open();
return driver;

View File

@ -23,8 +23,11 @@ import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -51,6 +54,8 @@ import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
@ -59,7 +64,7 @@ import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
/**
* WebSocketClient provides a means of establishing connections to remote websocket endpoints.
*/
public class WebSocketClient extends ContainerLifeCycle
public class WebSocketClient extends ContainerLifeCycle implements SessionListener
{
private static final Logger LOG = Log.getLogger(WebSocketClient.class);
@ -68,6 +73,7 @@ public class WebSocketClient extends ContainerLifeCycle
private final WebSocketExtensionFactory extensionRegistry;
private EventDriverFactory eventDriverFactory;
private SessionFactory sessionFactory;
private Set<WebSocketSession> openSessions = new CopyOnWriteArraySet<>();
private ByteBufferPool bufferPool;
private Executor executor;
private Scheduler scheduler;
@ -101,7 +107,7 @@ public class WebSocketClient extends ContainerLifeCycle
this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
this.masker = new RandomMasker();
this.eventDriverFactory = new EventDriverFactory(policy);
this.sessionFactory = new WebSocketSessionFactory();
this.sessionFactory = new WebSocketSessionFactory(this);
}
public Future<Session> connect(Object websocket, URI toUri) throws IOException
@ -196,41 +202,6 @@ public class WebSocketClient extends ContainerLifeCycle
return promise;
}
private synchronized void initialiseClient() throws IOException
{
if (executor == null)
{
QueuedThreadPool threadPool = new QueuedThreadPool();
String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
threadPool.setName(name);
executor = threadPool;
addBean(executor,true);
}
else
{
addBean(executor,false);
}
if (connectionManager != null)
{
return;
}
try
{
connectionManager = newConnectionManager();
addBean(connectionManager);
connectionManager.start();
}
catch (IOException e)
{
throw e;
}
catch (Exception e)
{
throw new IOException(e);
}
}
@Override
protected void doStart() throws Exception
{
@ -385,6 +356,11 @@ public class WebSocketClient extends ContainerLifeCycle
return this.policy.getMaxTextMessageSize();
}
public Set<WebSocketSession> getOpenSessions()
{
return Collections.unmodifiableSet(this.openSessions);
}
public WebSocketPolicy getPolicy()
{
return this.policy;
@ -429,6 +405,41 @@ public class WebSocketClient extends ContainerLifeCycle
return extensions;
}
private synchronized void initialiseClient() throws IOException
{
if (executor == null)
{
QueuedThreadPool threadPool = new QueuedThreadPool();
String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
threadPool.setName(name);
executor = threadPool;
addBean(executor,true);
}
else
{
addBean(executor,false);
}
if (connectionManager != null)
{
return;
}
try
{
connectionManager = newConnectionManager();
addBean(connectionManager);
connectionManager.start();
}
catch (IOException e)
{
throw e;
}
catch (Exception e)
{
throw new IOException(e);
}
}
/**
* Factory method for new ConnectionManager (used by other projects like cometd)
*
@ -439,6 +450,20 @@ public class WebSocketClient extends ContainerLifeCycle
return new ConnectionManager(this);
}
@Override
public void onSessionClosed(WebSocketSession session)
{
LOG.info("Session Closed: {}",session);
this.openSessions.remove(session);
}
@Override
public void onSessionOpened(WebSocketSession session)
{
LOG.info("Session Opened: {}",session);
this.openSessions.add(session);
}
public void setAsyncWriteTimeout(long ms)
{
this.policy.setAsyncWriteTimeout(ms);
@ -494,7 +519,7 @@ public class WebSocketClient extends ContainerLifeCycle
{
this.policy.setMaxBinaryMessageBufferSize(max);
}
/**
* Set the max idle timeout for new connections.
* <p>

View File

@ -169,4 +169,9 @@ public class JettyTrackingSocket extends WebSocketAdapter
LOG.debug("Waiting for message");
Assert.assertThat("Message Received",dataLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void close()
{
getSession().close();
}
}

View File

@ -0,0 +1,105 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SessionTest
{
private BlockheadServer server;
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testBasicEcho_FromClient() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
try
{
JettyTrackingSocket cliSock = new JettyTrackingSocket();
client.getPolicy().setIdleTimeout(10000);
URI wsUri = server.getWsUri();
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols("echo");
Future<Session> future = client.connect(cliSock,wsUri,request);
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
Session sess = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
cliSock.getSession().getRemote().sendStringByFuture("Hello World!");
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);
Set<WebSocketSession> open = client.getOpenSessions();
Assert.assertThat("Open Sessions.size", open.size(), is(1));
cliSock.assertMessage("Hello World!");
cliSock.close();
srvSock.close();
cliSock.waitForClose(500,TimeUnit.MILLISECONDS);
open = client.getOpenSessions();
Assert.assertThat("Open Sessions.size", open.size(), is(0));
}
finally
{
client.stop();
}
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;

View File

@ -0,0 +1,31 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
/**
* Basic listener interface for Session open/close.
* <p>
* Used primarily for tracking open sessions.
*/
public interface SessionListener
{
public void onSessionOpened(WebSocketSession session);
public void onSessionClosed(WebSocketSession session);
}

View File

@ -58,6 +58,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private final EventDriver websocket;
private final LogicalConnection connection;
private final Executor executor;
private final SessionListener[] sessionListeners;
private ExtensionFactory extensionFactory;
private String protocolVersion;
private Map<String, String[]> parameterMap = new HashMap<>();
@ -68,7 +69,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener[] sessionListeners)
{
if (requestURI == null)
{
@ -78,6 +79,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
this.requestURI = requestURI;
this.websocket = websocket;
this.connection = connection;
this.sessionListeners = sessionListeners;
this.executor = connection.getExecutor();
this.outgoingHandler = connection;
this.incomingHandler = websocket;
@ -333,22 +335,53 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
{
incomingError(cause);
}
@SuppressWarnings("incomplete-switch")
@Override
public void onConnectionStateChange(ConnectionState state)
{
if (state == ConnectionState.CLOSED)
switch (state)
{
IOState ioState = this.connection.getIOState();
// The session only cares about abnormal close, as we need to notify
// the endpoint of this close scenario.
if (ioState.wasAbnormalClose())
{
CloseInfo close = ioState.getCloseInfo();
LOG.debug("Detected abnormal close: {}",close);
// notify local endpoint
notifyClose(close.getStatusCode(),close.getReason());
}
case CLOSING:
// notify session listeners
for (SessionListener listener : sessionListeners)
{
try
{
listener.onSessionClosed(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
}
break;
case CLOSED:
IOState ioState = this.connection.getIOState();
// The session only cares about abnormal close, as we need to notify
// the endpoint of this close scenario.
if (ioState.wasAbnormalClose())
{
CloseInfo close = ioState.getCloseInfo();
LOG.debug("Detected abnormal close: {}",close);
// notify local endpoint
notifyClose(close.getStatusCode(),close.getReason());
}
break;
case OPEN:
// notify session listeners
for (SessionListener listener : sessionListeners)
{
try
{
listener.onSessionOpened(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
}
break;
}
}

View File

@ -29,6 +29,13 @@ import org.eclipse.jetty.websocket.common.events.JettyListenerEventDriver;
*/
public class WebSocketSessionFactory implements SessionFactory
{
private final SessionListener[] listeners;
public WebSocketSessionFactory(SessionListener... sessionListeners)
{
listeners = sessionListeners;
}
@Override
public boolean supports(EventDriver websocket)
{
@ -38,6 +45,6 @@ public class WebSocketSessionFactory implements SessionFactory
@Override
public WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
{
return new WebSocketSession(requestURI,websocket,connection);
return new WebSocketSession(requestURI,websocket,connection,listeners);
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.io;
import java.net.URI;
import org.eclipse.jetty.websocket.common.OutgoingFramesCapture;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.junit.rules.TestName;
@ -32,7 +33,7 @@ public class LocalWebSocketSession extends WebSocketSession
public LocalWebSocketSession(TestName testname, EventDriver driver)
{
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname));
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname), new SessionListener[0]);
this.id = testname.getMethodName();
outgoingCapture = new OutgoingFramesCapture();
setOutgoingHandler(outgoingCapture);

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
@ -78,7 +79,7 @@ public class DummyMuxAddServer implements MuxAddServer
response.append("\r\n");
EventDriver websocket = this.eventDriverFactory.wrap(echo);
WebSocketSession session = new WebSocketSession(request.getRequestURI(),websocket,channel);
WebSocketSession session = new WebSocketSession(request.getRequestURI(),websocket,channel, new SessionListener[0]);
UpgradeResponse uresponse = new UpgradeResponse();
uresponse.setAcceptedSubProtocol("echo");
session.setUpgradeResponse(uresponse);

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.mux.helper;
import java.net.URI;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.junit.rules.TestName;
@ -31,7 +32,7 @@ public class LocalWebSocketSession extends WebSocketSession
public LocalWebSocketSession(TestName testname, EventDriver driver)
{
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname));
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname), new SessionListener[0]);
this.id = testname.getMethodName();
outgoingCapture = new OutgoingFramesCapture();
setOutgoingHandler(outgoingCapture);

View File

@ -28,7 +28,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
@ -51,6 +53,7 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.util.QuoteUtil;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriver;
@ -65,7 +68,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
/**
* Factory to create WebSocket connections
*/
public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory, SessionListener
{
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
@ -95,6 +98,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
private final EventDriverFactory eventDriverFactory;
private final WebSocketExtensionFactory extensionFactory;
private List<SessionFactory> sessionFactories;
private Set<WebSocketSession> openSessions = new CopyOnWriteArraySet<>();
private WebSocketCreator creator;
private List<Class<?>> registeredSocketClasses;
@ -119,7 +123,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.eventDriverFactory = new EventDriverFactory(defaultPolicy);
this.extensionFactory = new WebSocketExtensionFactory(defaultPolicy,bufferPool);
this.sessionFactories = new ArrayList<>();
this.sessionFactories.add(new WebSocketSessionFactory());
this.sessionFactories.add(new WebSocketSessionFactory(this));
this.creator = this;
// Create supportedVersions
@ -303,6 +307,11 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
return extensionFactory;
}
public Set<WebSocketSession> getOpenSessions()
{
return Collections.unmodifiableSet(this.openSessions);
}
@Override
public WebSocketPolicy getPolicy()
{
@ -371,6 +380,18 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
return true;
}
@Override
public void onSessionClosed(WebSocketSession session)
{
this.openSessions.remove(session);
}
@Override
public void onSessionOpened(WebSocketSession session)
{
this.openSessions.add(session);
}
protected String[] parseProtocols(String protocol)
{
if (protocol == null)