Issue #6106 - always decorate javax.websocket Configurators

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-04-06 22:39:15 +10:00
parent a1e522755b
commit f858aa653c
6 changed files with 209 additions and 41 deletions

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.Behavior;
@ -291,7 +292,12 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
headers(headers -> headers.add(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, extensionString));
// Notify the listener which may change the headers directly.
notifyUpgradeListeners((listener) -> listener.onHandshakeRequest(this));
Exception listenerError = notifyUpgradeListeners((listener) -> listener.onHandshakeRequest(this));
if (listenerError != null)
{
abort(listenerError);
return;
}
// Check if extensions were set in the headers from the upgrade listener.
String extsAfterListener = String.join(",", getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, true));
@ -306,8 +312,9 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
}
}
private void notifyUpgradeListeners(Consumer<UpgradeListener> action)
private Exception notifyUpgradeListeners(Consumer<UpgradeListener> action)
{
MultiException multiException = null;
for (UpgradeListener listener : upgradeListeners)
{
try
@ -317,8 +324,13 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
catch (Throwable t)
{
LOG.info("Exception while invoking listener {}", listener, t);
if (multiException == null)
multiException = new MultiException();
multiException.add(t);
}
}
return multiException;
}
public void upgrade(HttpResponse response, EndPoint endPoint)
@ -437,7 +449,9 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, coreSession);
wsClient.getEventListeners().forEach(wsConnection::addEventListener);
coreSession.setWebSocketConnection(wsConnection);
notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));
Exception listenerError = notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));
if (listenerError != null)
throw new WebSocketException("onHandshakeResponse error", listenerError);
// Now swap out the connection
try

View File

@ -18,17 +18,18 @@ import java.util.List;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.exception.InvalidWebSocketException;
import org.eclipse.jetty.websocket.javax.common.ClientEndpointConfigWrapper;
public class AnnotatedClientEndpointConfig extends ClientEndpointConfigWrapper
{
public AnnotatedClientEndpointConfig(ClientEndpoint anno)
public AnnotatedClientEndpointConfig(ClientEndpoint anno, WebSocketComponents components)
{
Configurator configurator;
try
{
configurator = anno.configurator().getDeclaredConstructor().newInstance();
configurator = components.getObjectFactory().createInstance(anno.configurator());
}
catch (Exception e)
{

View File

@ -229,9 +229,16 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
@Override
public Session connectToServer(final Endpoint endpoint, final ClientEndpointConfig providedConfig, final URI path) throws DeploymentException, IOException
{
ClientEndpointConfig config = providedConfig;
if (config == null)
ClientEndpointConfig config;
if (providedConfig == null)
{
config = new BasicClientEndpointConfig();
}
else
{
config = providedConfig;
components.getObjectFactory().decorate(providedConfig.getConfigurator());
}
ConfiguredEndpoint instance = new ConfiguredEndpoint(endpoint, config);
return connect(instance, path);
@ -240,6 +247,7 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
@Override
public Session connectToServer(Object endpoint, URI path) throws DeploymentException, IOException
{
// The Configurator will be decorated when it is created in the getAnnotatedConfig method.
ClientEndpointConfig config = getAnnotatedConfig(endpoint);
ConfiguredEndpoint instance = new ConfiguredEndpoint(endpoint, config);
return connect(instance, path);
@ -275,7 +283,7 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
if (anno == null)
throw new DeploymentException("Could not get ClientEndpoint annotation for " + endpoint.getClass().getName());
return new AnnotatedClientEndpointConfig(anno);
return new AnnotatedClientEndpointConfig(anno, components);
}
@Override

View File

@ -70,9 +70,7 @@ public class AnnotatedServerEndpointConfig extends ServerEndpointConfigWrapper
else
path = anno.value();
// Make sure all Configurators obtained are decorated.
ServerEndpointConfig.Configurator rawConfigurator = getConfigurator(baseServerConfig, anno);
ServerEndpointConfig.Configurator configurator = containerScope.getObjectFactory().decorate(rawConfigurator);
ServerEndpointConfig.Configurator configurator = getConfigurator(baseServerConfig, anno, containerScope);
// Build a ServerEndpointConfig with the Javax API builder to wrap.
ServerEndpointConfig endpointConfig = ServerEndpointConfig.Builder.create(endpointClass, path)
@ -90,7 +88,7 @@ public class AnnotatedServerEndpointConfig extends ServerEndpointConfigWrapper
init(endpointConfig);
}
private static Configurator getConfigurator(ServerEndpointConfig baseServerConfig, ServerEndpoint anno) throws DeploymentException
private static Configurator getConfigurator(ServerEndpointConfig baseServerConfig, ServerEndpoint anno, JavaxWebSocketContainer containerScope) throws DeploymentException
{
Configurator ret = null;
@ -115,7 +113,7 @@ public class AnnotatedServerEndpointConfig extends ServerEndpointConfigWrapper
// Instantiate the provided configurator
try
{
return anno.configurator().getConstructor().newInstance();
return containerScope.getObjectFactory().createInstance(anno.configurator());
}
catch (Exception e)
{

View File

@ -232,6 +232,9 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer
if (isStarted() || isStarting())
{
// Decorate the provided Configurator.
components.getObjectFactory().decorate(providedConfig.getConfigurator());
// If we have annotations merge the annotated ServerEndpointConfig with the provided one.
Class<?> endpointClass = providedConfig.getEndpointClass();
ServerEndpoint anno = endpointClass.getAnnotation(ServerEndpoint.class);

View File

@ -15,19 +15,28 @@ package org.eclipse.jetty.cdi.tests.websocket;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.inject.Inject;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.HandshakeResponse;
import javax.websocket.MessageHandler;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.cdi.CdiDecoratingListener;
import org.eclipse.jetty.cdi.CdiServletContainerInitializer;
@ -35,8 +44,13 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainerProvider;
import org.eclipse.jetty.websocket.javax.client.internal.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer.Configurator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -50,15 +64,16 @@ public class JavaxWebSocketCdiTest
private Server _server;
private WebSocketContainer _client;
private ServerConnector _connector;
private ServletContextHandler context;
@BeforeEach
public void before() throws Exception
public void before()
{
_server = new Server();
_connector = new ServerConnector(_server);
_server.addConnector(_connector);
ServletContextHandler context = new ServletContextHandler();
context = new ServletContextHandler();
context.setContextPath("/");
// Enable Weld + CDI
@ -66,17 +81,22 @@ public class JavaxWebSocketCdiTest
context.addServletContainerInitializer(new CdiServletContainerInitializer());
context.addServletContainerInitializer(new org.jboss.weld.environment.servlet.EnhancedListener());
// Add WebSocket endpoints
JavaxWebSocketServletContainerInitializer.configure(context, (servletContext, wsContainer) ->
wsContainer.addEndpoint(CdiEchoSocket.class));
// Add to Server
_server.setHandler(context);
}
public void start(Configurator configurator) throws Exception
{
// Add WebSocket endpoints
JavaxWebSocketServletContainerInitializer.configure(context, configurator);
// Start Server
_server.start();
_client = JavaxWebSocketClientContainerProvider.getContainer(null);
// Configure the Client with the same DecoratedObjectFactory from the server.
WebSocketComponents components = WebSocketServerComponents.getWebSocketComponents(context.getServletContext());
_client = new JavaxWebSocketClientContainer(components);
LifeCycle.start(_client);
}
@AfterEach
@ -86,18 +106,94 @@ public class JavaxWebSocketCdiTest
_server.stop();
}
@ClientEndpoint
public static class TestClientEndpoint
@Test
public void testAnnotatedEndpoint() throws Exception
{
start((servletContext, wsContainer) -> wsContainer.addEndpoint(AnnotatedCdiEchoSocket.class));
// If we can get an echo from the websocket endpoint we know that CDI injection of the logger worked as there was no NPE.
AnnotatedCdiClientSocket clientEndpoint = new AnnotatedCdiClientSocket();
URI uri = URI.create("ws://localhost:" + _connector.getLocalPort() + "/echo");
Session session = _client.connectToServer(clientEndpoint, uri);
session.getBasicRemote().sendText("hello world");
assertThat(clientEndpoint._textMessages.poll(5, TimeUnit.SECONDS), is("hello world"));
session.close();
assertTrue(clientEndpoint._closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testConfiguredEndpoint() throws Exception
{
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(ConfiguredCdiEchoSocket.class, "/echo")
.configurator(new ServerConfigurator())
.build();
start((servletContext, wsContainer) -> wsContainer.addEndpoint(serverEndpointConfig));
// If we can get an echo from the websocket endpoint we know that CDI injection of the logger worked as there was no NPE.
ConfiguredCdiClientSocket clientEndpoint = new ConfiguredCdiClientSocket();
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create()
.configurator(new ClientConfigurator())
.build();
URI uri = URI.create("ws://localhost:" + _connector.getLocalPort() + "/echo");
Session session = _client.connectToServer(clientEndpoint, clientEndpointConfig, uri);
session.getBasicRemote().sendText("hello world");
assertThat(clientEndpoint._textMessages.poll(5, TimeUnit.SECONDS), is("hello world"));
session.close();
assertTrue(clientEndpoint._closeLatch.await(5, TimeUnit.SECONDS));
}
public static class ClientConfigurator extends ClientEndpointConfig.Configurator
{
@Inject
public Logger logger;
@Override
public void beforeRequest(Map<String, List<String>> headers)
{
logger.info("beforeRequest");
}
}
public static class ServerConfigurator extends ServerEndpointConfig.Configurator
{
@Inject
public Logger logger;
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response)
{
logger.info("modifyHandshake");
}
}
@ClientEndpoint(configurator = ClientConfigurator.class)
public static class AnnotatedCdiClientSocket
{
@Inject
public Logger logger;
BlockingArrayQueue<String> _textMessages = new BlockingArrayQueue<>();
CountDownLatch _closeLatch = new CountDownLatch(1);
@OnOpen
public void onOpen(Session session)
{
logger.info("onOpen: " + session);
}
@OnMessage
public void onMessage(String message)
{
_textMessages.add(message);
}
@OnError
public void onError(Throwable t)
{
t.printStackTrace();
}
@OnClose
public void onClose()
{
@ -105,21 +201,8 @@ public class JavaxWebSocketCdiTest
}
}
@Test
public void testBasicEcho() throws Exception
{
// If we can get an echo from the websocket endpoint we know that CDI injection of the logger worked as there was no NPE.
TestClientEndpoint clientEndpoint = new TestClientEndpoint();
URI uri = URI.create("ws://localhost:" + _connector.getLocalPort() + "/echo");
Session session = _client.connectToServer(clientEndpoint, uri);
session.getBasicRemote().sendText("hello world");
assertThat(clientEndpoint._textMessages.poll(5, TimeUnit.SECONDS), is("hello world"));
session.close();
assertTrue(clientEndpoint._closeLatch.await(5, TimeUnit.SECONDS));
}
@ServerEndpoint("/echo")
public static class CdiEchoSocket
@ServerEndpoint(value = "/echo", configurator = ServerConfigurator.class)
public static class AnnotatedCdiEchoSocket
{
@Inject
public Logger logger;
@ -144,12 +227,73 @@ public class JavaxWebSocketCdiTest
{
t.printStackTrace();
}
}
@OnClose
public void onClose(CloseReason close)
public static class ConfiguredCdiClientSocket extends Endpoint implements MessageHandler.Whole<String>
{
@Inject
public Logger logger;
BlockingArrayQueue<String> _textMessages = new BlockingArrayQueue<>();
CountDownLatch _closeLatch = new CountDownLatch(1);
@Override
public void onMessage(String message)
{
logger.info("onClose() close:" + close);
this.session = null;
_textMessages.add(message);
}
@Override
public void onOpen(Session session, EndpointConfig config)
{
logger.info("onOpen: " + session);
session.addMessageHandler(this);
}
@Override
public void onError(Session session, Throwable thr)
{
thr.printStackTrace();
}
@Override
public void onClose(Session session, CloseReason closeReason)
{
_closeLatch.countDown();
}
}
public static class ConfiguredCdiEchoSocket extends Endpoint implements MessageHandler.Whole<String>
{
@Inject
public Logger logger;
private Session session;
@Override
public void onMessage(String message)
{
try
{
session.getBasicRemote().sendText(message);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
@Override
public void onOpen(Session session, EndpointConfig config)
{
logger.info("onOpen() session:" + session);
this.session = session;
session.addMessageHandler(this);
}
@Override
public void onError(Session session, Throwable thr)
{
thr.printStackTrace();
}
}
}