Issue #1650 - fixing CDI + websocket integration

This commit is contained in:
Joakim Erdfelt 2017-07-03 09:19:17 -07:00
parent 38c112764d
commit 2d65605de1
17 changed files with 183 additions and 117 deletions

View File

@ -2,7 +2,7 @@
<parent>
<groupId>org.eclipse.jetty.cdi</groupId>
<artifactId>jetty-cdi-parent</artifactId>
<version>9.4.7-SNAPSHOT</version>
<version>9.4.7-jws11-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cdi-websocket</artifactId>

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.cdi.websocket;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
@ -26,92 +28,76 @@ import javax.enterprise.inject.spi.CDI;
import org.eclipse.jetty.cdi.core.AnyLiteral;
import org.eclipse.jetty.cdi.core.ScopedInstance;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
public class WebSocketCdiListener extends AbstractContainerListener
public class WebSocketCdiListener extends AbstractContainerListener implements WebSocketSession.Listener
{
static final Logger LOG = Log.getLogger(WebSocketCdiListener.class);
private Map<String, ScopedInstance<WebSocketScopeContext>> instances = new ConcurrentHashMap<>();
@SuppressWarnings(
{ "rawtypes", "unchecked" })
{"rawtypes", "unchecked"})
public static <T> ScopedInstance<T> newInstance(Class<T> clazz)
{
BeanManager bm = CDI.current().getBeanManager();
ScopedInstance sbean = new ScopedInstance();
Set<Bean<?>> beans = bm.getBeans(clazz,AnyLiteral.INSTANCE);
Set<Bean<?>> beans = bm.getBeans(clazz, AnyLiteral.INSTANCE);
if (beans.size() > 0)
{
sbean.bean = beans.iterator().next();
sbean.creationalContext = bm.createCreationalContext(sbean.bean);
sbean.instance = bm.getReference(sbean.bean,clazz,sbean.creationalContext);
sbean.instance = bm.getReference(sbean.bean, clazz, sbean.creationalContext);
return sbean;
}
else
{
throw new RuntimeException(String.format("Can't find class %s",clazz));
throw new RuntimeException(String.format("Can't find class %s", clazz));
}
}
public static class ContainerListener extends AbstractContainerListener
@Override
public void onCreated(WebSocketSession session)
{
private static final Logger LOG = Log.getLogger(WebSocketCdiListener.ContainerListener.class);
private final WebSocketContainerScope container;
private final ScopedInstance<WebSocketScopeContext> wsScope;
String id = toId(session);
public ContainerListener(WebSocketContainerScope container)
{
this.container = container;
this.wsScope = newInstance(WebSocketScopeContext.class);
this.wsScope.instance.create();
}
ScopedInstance<WebSocketScopeContext> wsScope = newInstance(WebSocketScopeContext.class);
wsScope.instance.create();
wsScope.instance.begin();
wsScope.instance.setSession(session);
@Override
public void lifeCycleStarted(LifeCycle event)
{
if (event == container)
{
if (LOG.isDebugEnabled())
{
LOG.debug("starting websocket container [{}]",event);
}
wsScope.instance.begin();
return;
}
if (event instanceof WebSocketSessionScope)
{
if (LOG.isDebugEnabled())
{
LOG.debug("starting websocket session [{}]",event);
}
wsScope.instance.setSession((Session)event);
return;
}
}
instances.put(id, wsScope);
}
@Override
public void lifeCycleStopped(LifeCycle event)
@Override
public void onOpened(WebSocketSession session)
{
// do nothing
}
@Override
public void onClosed(WebSocketSession session)
{
String id = toId(session);
ScopedInstance<WebSocketScopeContext> wsScope = instances.remove(id);
if (wsScope != null)
{
if (event == container)
{
if (LOG.isDebugEnabled())
{
LOG.debug("stopped websocket container [{}]",event);
}
this.wsScope.instance.end();
this.wsScope.instance.destroy();
this.wsScope.destroy();
}
wsScope.instance.end();
wsScope.instance.destroy();
wsScope.destroy();
}
}
private String toId(WebSocketSession session)
{
return session.getRemoteAddress().toString() + ">" + session.getLocalAddress().toString();
}
@Override
public void lifeCycleStarting(LifeCycle event)
{
@ -119,19 +105,11 @@ public class WebSocketCdiListener extends AbstractContainerListener
{
if (LOG.isDebugEnabled())
{
LOG.debug("started websocket container [{}]",event);
}
ContainerListener listener = new ContainerListener((WebSocketContainerScope)event);
if (event instanceof ContainerLifeCycle)
{
ContainerLifeCycle container = (ContainerLifeCycle)event;
container.addLifeCycleListener(listener);
container.addEventListener(listener);
}
else
{
throw new RuntimeException("Unable to setup CDI against non-container: " + event.getClass().getName());
LOG.debug("started websocket container [{}]", event);
}
WebSocketContainerScope webSocketContainerScope = (WebSocketContainerScope) event;
webSocketContainerScope.addSessionListener(this);
}
}
}

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.util.WSURI;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.junit.AfterClass;
@ -78,14 +79,8 @@ public class BasicAppTest
server.start();
String host = connector.getHost();
if (host == null)
{
host = "localhost";
}
int port = connector.getLocalPort();
serverHttpURI = new URI(String.format("http://%s:%d/",host,port));
serverWebsocketURI = new URI(String.format("ws://%s:%d/",host,port));
serverHttpURI = server.getURI().resolve("/");
serverWebsocketURI = WSURI.toWebsocket(serverHttpURI);
}
@AfterClass
@ -118,7 +113,6 @@ public class BasicAppTest
assertThat("Messages received",socket.getTextMessages().size(),is(1));
String response = socket.getTextMessages().poll();
System.err.println(response);
assertThat("Message[0]",response,is("Hello World"));
}

View File

@ -141,7 +141,6 @@ public class CdiAppTest
assertThat("Messages received",socket.getTextMessages().size(),is(1));
String response = socket.getTextMessages().poll();
System.err.println(response);
assertThat("Message[0]",response,
allOf(
@ -173,9 +172,7 @@ public class CdiAppTest
assertThat("Messages received",socket.getTextMessages().size(),is(2));
String response = socket.getTextMessages().poll();
System.out.println("[0]" + response);
assertThat("Message[0]",response,containsString("Hello there stuff"));
System.out.println("[1]" + socket.getTextMessages().poll());
}
finally
{

View File

@ -18,9 +18,9 @@
<module>cdi-core</module>
<module>cdi-servlet</module>
<module>cdi-full-servlet</module>
<module>test-cdi-webapp</module>
<!-- deprecated
<module>cdi-websocket</module>
<module>test-cdi-webapp</module>
<!-- deprecated
<module>test-cdi-it</module>
-->
</modules>

View File

@ -93,11 +93,7 @@ public class SessionInfoIT
socket.session.getBasicRemote().sendText("info");
socket.messages.awaitEventCount(1,2,TimeUnit.SECONDS);
System.out.printf("socket.messages.size = %s%n",socket.messages.size());
String msg = socket.messages.poll();
System.out.printf("Message is [%s]%n",msg);
assertThat("Message", msg, containsString("HttpSession = HttpSession"));
socket.session.getBasicRemote().sendText("close");

View File

@ -401,6 +401,18 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
}
return config;
}
@Override
public void addSessionListener(WebSocketSession.Listener listener)
{
this.scopeDelegate.addSessionListener(listener);
}
@Override
public boolean removeSessionListener(WebSocketSession.Listener listener)
{
return this.scopeDelegate.removeSessionListener(listener);
}
@Override
public void onSessionClosed(WebSocketSession session)

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.common.ManagedEndpoint;
public class ConfiguredEndpoint implements ManagedEndpoint
{
/** The instance of the Endpoint */
private final Object endpoint;
private Object endpoint;
/** The optional instance specific configuration for the Endpoint */
private final EndpointConfig config;
@ -43,14 +43,15 @@ public class ConfiguredEndpoint implements ManagedEndpoint
return config;
}
public Object getEndpoint()
{
return endpoint;
}
@Override
public Object getRawEndpoint()
{
return endpoint;
}
@Override
public void setRawEndpoint(Object rawEndpoint)
{
this.endpoint = rawEndpoint;
}
}

View File

@ -24,11 +24,14 @@ import java.net.SocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
@ -67,10 +70,11 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
// From HttpClient
private final HttpClient httpClient;
//
// The container
private final WebSocketContainerScope containerScope;
private final WebSocketExtensionFactory extensionRegistry;
private SessionFactory sessionFactory;
private final List<WebSocketSession.Listener> listeners = new CopyOnWriteArrayList<>();
private final int id = ThreadLocalRandom.current().nextInt();
@ -557,6 +561,33 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
}
}
protected void notifySessionListeners(Consumer<WebSocketSession.Listener> consumer)
{
for (WebSocketSession.Listener listener : listeners)
{
try
{
consumer.accept(listener);
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}
@Override
public void addSessionListener(WebSocketSession.Listener listener)
{
this.listeners.add(listener);
}
@Override
public boolean removeSessionListener(WebSocketSession.Listener listener)
{
return this.listeners.remove(listener);
}
@Override
public void onSessionClosed(WebSocketSession session)
{

View File

@ -596,7 +596,8 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
URI requestURI = this.getURI();
WebSocketSession session = getSessionFactory().createSession(requestURI,localEndpoint,connection);
WebSocketSession session = getSessionFactory().createSession(requestURI, localEndpoint, connection);
wsClient.notifySessionListeners((listener -> listener.onCreated(session)));
session.setUpgradeRequest(new ClientUpgradeRequest(this));
session.setUpgradeResponse(new ClientUpgradeResponse(response));
connection.addListener(session);

View File

@ -24,4 +24,6 @@ package org.eclipse.jetty.websocket.common;
public interface ManagedEndpoint
{
Object getRawEndpoint();
void setRawEndpoint(Object rawEndpoint);
}

View File

@ -89,8 +89,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private final AtomicBoolean closeSent = new AtomicBoolean(false);
private final AtomicBoolean closeNotified = new AtomicBoolean(false);
// The websocket endpoint object itself
private final Object endpoint;
/* The websocket endpoint object itself.
* Not declared final, as it can be decorated by other libraries (like CDI)
*/
private Object endpoint;
// Callbacks
private FrameCallback onDisconnectCallback = new CompletionCallback()
@ -225,12 +227,26 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
if (LOG.isDebugEnabled())
LOG.debug("Using RemoteEndpointFactory: {}", remoteEndpointFactory);
// Start WebSocketSession before decorating the endpoint (CDI requirement)
super.doStart();
// Decorate endpoint only after WebSocketSession has been started (CDI requirement)
if(this.endpoint instanceof ManagedEndpoint)
{
ManagedEndpoint managedEndpoint = (ManagedEndpoint) this.endpoint;
Object rawEndpoint = managedEndpoint.getRawEndpoint();
rawEndpoint = this.containerScope.getObjectFactory().decorate(rawEndpoint);
managedEndpoint.setRawEndpoint(rawEndpoint);
}
else
{
this.endpoint = this.containerScope.getObjectFactory().decorate(this.endpoint);
}
this.endpointFunctions = newEndpointFunctions(this.endpoint);
addManaged(this.endpointFunctions);
super.doStart();
connection.setMaxIdleTimeout(this.policy.getIdleTimeout());
Throwable fastFail;
@ -963,6 +979,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
public interface Listener
{
@SuppressWarnings("unused")
default void onCreated(WebSocketSession session) { }
void onOpened(WebSocketSession session);
void onClosed(WebSocketSession session);

View File

@ -112,18 +112,10 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
@Override
protected void doStart() throws Exception
{
LOG.info("Starting");
discoverEndpointFunctions(this.endpoint);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
LOG.info("Stopping");
super.doStop();
}
protected void discoverEndpointFunctions(Object endpoint)
{
boolean supportAnnotations = true;

View File

@ -72,7 +72,19 @@ public class DelegatedContainerScope implements WebSocketContainerScope
{
return this.delegate.isRunning();
}
@Override
public void addSessionListener(WebSocketSession.Listener listener)
{
this.delegate.addSessionListener(listener);
}
@Override
public boolean removeSessionListener(WebSocketSession.Listener listener)
{
return this.delegate.removeSessionListener(listener);
}
@Override
public void onSessionOpened(WebSocketSession session)
{

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.common.scopes;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
@ -35,6 +37,7 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
private final DecoratedObjectFactory objectFactory;
private final WebSocketPolicy containerPolicy;
private final Executor executor;
protected final List<WebSocketSession.Listener> listeners = new CopyOnWriteArrayList<>();
private SslContextFactory sslContextFactory;
public SimpleContainerScope(WebSocketPolicy policy)
@ -115,7 +118,19 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
{
this.sslContextFactory = sslContextFactory;
}
@Override
public void addSessionListener(WebSocketSession.Listener listener)
{
this.listeners.add(listener);
}
@Override
public boolean removeSessionListener(WebSocketSession.Listener listener)
{
return this.listeners.remove(listener);
}
@Override
public void onSessionOpened(WebSocketSession session)
{

View File

@ -72,6 +72,21 @@ public interface WebSocketContainerScope
* @return true if container is started and running
*/
boolean isRunning();
/**
* Add a WebSocketSession.Listener for create/open/close events
*
* @param listener the listener to add
*/
void addSessionListener(WebSocketSession.Listener listener);
/**
* Remove a WebSocketSession.Listener
*
* @param listener the listener to remove
* @return true if a listener was removed, false if no listener was removed
*/
boolean removeSessionListener(WebSocketSession.Listener listener);
/**
* A Session has been opened

View File

@ -171,15 +171,17 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
addBean(scheduler);
addBean(bufferPool);
}
@Override
public void addSessionListener(WebSocketSession.Listener listener)
{
listeners.add(listener);
}
public void removeSessionListener(WebSocketSession.Listener listener)
@Override
public boolean removeSessionListener(WebSocketSession.Listener listener)
{
listeners.remove(listener);
return listeners.remove(listener);
}
@Override
@ -215,9 +217,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
return false;
}
// Allow Decorators to do their thing
websocketPojo = getObjectFactory().decorate(websocketPojo);
// Get the original HTTPConnection
HttpConnection connection = (HttpConnection) request.getAttribute("org.eclipse.jetty.server.HttpConnection");
@ -266,7 +265,9 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{
try
{
return impl.createSession(requestURI, websocket, connection);
WebSocketSession session = impl.createSession(requestURI, websocket, connection);
notifySessionListeners((listener -> listener.onCreated(session)));
return session;
}
catch (Throwable e)
{