Merge pull request #4987 from eclipse/jetty-9.4.x-1100-WebSocketEncoderLifeCycle
Issue #1100 - ensure init and destroy are always called on JSR356 Encoders
This commit is contained in:
commit
1b59672b7f
|
@ -308,6 +308,8 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
|
||||||
protected void doStop() throws Exception
|
protected void doStop() throws Exception
|
||||||
{
|
{
|
||||||
ShutdownThread.deregister(this);
|
ShutdownThread.deregister(this);
|
||||||
|
this.encoderFactory.destroy();
|
||||||
|
this.decoderFactory.destroy();
|
||||||
endpointClientMetadataCache.clear();
|
endpointClientMetadataCache.clear();
|
||||||
super.doStop();
|
super.doStop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,4 +26,6 @@ import javax.websocket.EndpointConfig;
|
||||||
public interface Configurable
|
public interface Configurable
|
||||||
{
|
{
|
||||||
void init(EndpointConfig config);
|
void init(EndpointConfig config);
|
||||||
|
|
||||||
|
void destroy();
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,12 @@ public class DecoderFactory implements Configurable
|
||||||
{
|
{
|
||||||
this.decoder.init(config);
|
this.decoder.init(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy()
|
||||||
|
{
|
||||||
|
this.decoder.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Logger LOG = Log.getLogger(DecoderFactory.class);
|
private static final Logger LOG = Log.getLogger(DecoderFactory.class);
|
||||||
|
@ -185,6 +191,17 @@ public class DecoderFactory implements Configurable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy()
|
||||||
|
{
|
||||||
|
for (Wrapper wrapper : activeWrappers.values())
|
||||||
|
{
|
||||||
|
wrapper.decoder.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
activeWrappers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
public Wrapper newWrapper(DecoderMetadata metadata)
|
public Wrapper newWrapper(DecoderMetadata metadata)
|
||||||
{
|
{
|
||||||
Class<? extends Decoder> decoderClass = metadata.getCoderClass();
|
Class<? extends Decoder> decoderClass = metadata.getCoderClass();
|
||||||
|
|
|
@ -62,14 +62,21 @@ public class EncoderFactory implements Configurable
|
||||||
{
|
{
|
||||||
this.encoder.init(config);
|
this.encoder.init(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy()
|
||||||
|
{
|
||||||
|
this.encoder.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Logger LOG = Log.getLogger(EncoderFactory.class);
|
private static final Logger LOG = Log.getLogger(EncoderFactory.class);
|
||||||
|
|
||||||
private final EncoderMetadataSet metadatas;
|
private final EncoderMetadataSet metadatas;
|
||||||
private final WebSocketContainerScope containerScope;
|
private final WebSocketContainerScope containerScope;
|
||||||
private EncoderFactory parentFactory;
|
private final Map<Class<?>, Wrapper> activeWrappers;
|
||||||
private Map<Class<?>, Wrapper> activeWrappers;
|
private final EncoderFactory parentFactory;
|
||||||
|
private EndpointConfig endpointConfig;
|
||||||
|
|
||||||
public EncoderFactory(WebSocketContainerScope containerScope, EncoderMetadataSet metadatas)
|
public EncoderFactory(WebSocketContainerScope containerScope, EncoderMetadataSet metadatas)
|
||||||
{
|
{
|
||||||
|
@ -153,10 +160,9 @@ public class EncoderFactory implements Configurable
|
||||||
@Override
|
@Override
|
||||||
public void init(EndpointConfig config)
|
public void init(EndpointConfig config)
|
||||||
{
|
{
|
||||||
|
this.endpointConfig = config;
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
{
|
LOG.debug("init({})", endpointConfig);
|
||||||
LOG.debug("init({})", config);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Instantiate all declared encoders
|
// Instantiate all declared encoders
|
||||||
for (EncoderMetadata metadata : metadatas)
|
for (EncoderMetadata metadata : metadatas)
|
||||||
|
@ -164,20 +170,29 @@ public class EncoderFactory implements Configurable
|
||||||
Wrapper wrapper = newWrapper(metadata);
|
Wrapper wrapper = newWrapper(metadata);
|
||||||
activeWrappers.put(metadata.getObjectType(), wrapper);
|
activeWrappers.put(metadata.getObjectType(), wrapper);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize all encoders
|
@Override
|
||||||
|
public void destroy()
|
||||||
|
{
|
||||||
for (Wrapper wrapper : activeWrappers.values())
|
for (Wrapper wrapper : activeWrappers.values())
|
||||||
{
|
{
|
||||||
wrapper.encoder.init(config);
|
wrapper.encoder.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
activeWrappers.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Wrapper newWrapper(EncoderMetadata metadata)
|
private Wrapper newWrapper(EncoderMetadata metadata)
|
||||||
{
|
{
|
||||||
|
if (endpointConfig == null)
|
||||||
|
throw new IllegalStateException("EndpointConfig not set");
|
||||||
|
|
||||||
Class<? extends Encoder> encoderClass = metadata.getCoderClass();
|
Class<? extends Encoder> encoderClass = metadata.getCoderClass();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Encoder encoder = containerScope.getObjectFactory().createInstance(encoderClass);
|
Encoder encoder = containerScope.getObjectFactory().createInstance(encoderClass);
|
||||||
|
encoder.init(endpointConfig);
|
||||||
return new Wrapper(encoder, metadata);
|
return new Wrapper(encoder, metadata);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
|
|
@ -306,6 +306,13 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
|
||||||
decoderFactory.init(config);
|
decoderFactory.init(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy()
|
||||||
|
{
|
||||||
|
encoderFactory.destroy();
|
||||||
|
decoderFactory.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeMessageHandler(MessageHandler handler)
|
public void removeMessageHandler(MessageHandler handler)
|
||||||
{
|
{
|
||||||
|
|
|
@ -77,6 +77,10 @@ public abstract class AbstractJsrEventDriver extends AbstractEventDriver
|
||||||
CloseCode closecode = CloseCodes.getCloseCode(close.getStatusCode());
|
CloseCode closecode = CloseCodes.getCloseCode(close.getStatusCode());
|
||||||
CloseReason closereason = new CloseReason(closecode, close.getReason());
|
CloseReason closereason = new CloseReason(closecode, close.getReason());
|
||||||
onClose(closereason);
|
onClose(closereason);
|
||||||
|
|
||||||
|
// Destroy the JsrSession.
|
||||||
|
if (jsrsession != null)
|
||||||
|
jsrsession.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void onClose(CloseReason closereason);
|
protected abstract void onClose(CloseReason closereason);
|
||||||
|
|
|
@ -0,0 +1,186 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.websocket.jsr356.server;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import javax.websocket.ClientEndpointConfig;
|
||||||
|
import javax.websocket.CloseReason;
|
||||||
|
import javax.websocket.ContainerProvider;
|
||||||
|
import javax.websocket.Encoder;
|
||||||
|
import javax.websocket.Endpoint;
|
||||||
|
import javax.websocket.EndpointConfig;
|
||||||
|
import javax.websocket.MessageHandler;
|
||||||
|
import javax.websocket.Session;
|
||||||
|
import javax.websocket.WebSocketContainer;
|
||||||
|
import javax.websocket.server.ServerEndpointConfig;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
import org.eclipse.jetty.websocket.jsr356.EncoderFactory;
|
||||||
|
import org.eclipse.jetty.websocket.jsr356.JsrSession;
|
||||||
|
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
|
||||||
|
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.EchoReturnEndpoint;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class EncoderLifeCycleTest
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(EncoderLifeCycleTest.class);
|
||||||
|
private static Server server;
|
||||||
|
private static URI serverUri;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void startServer() throws Exception
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
ServerConnector connector = new ServerConnector(server);
|
||||||
|
server.addConnector(connector);
|
||||||
|
|
||||||
|
ServletContextHandler contextHandler = new ServletContextHandler();
|
||||||
|
contextHandler.setContextPath("/");
|
||||||
|
server.setHandler(contextHandler);
|
||||||
|
|
||||||
|
WebSocketServerContainerInitializer.configure(contextHandler, ((servletContext, serverContainer) ->
|
||||||
|
serverContainer.addEndpoint(ServerEndpointConfig.Builder.create(EchoReturnEndpoint.class, "/").build())));
|
||||||
|
|
||||||
|
// Start Server
|
||||||
|
server.start();
|
||||||
|
serverUri = new URI(String.format("ws://localhost:%d/", connector.getLocalPort()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class StringHolder
|
||||||
|
{
|
||||||
|
private final String string;
|
||||||
|
|
||||||
|
public StringHolder(String msg)
|
||||||
|
{
|
||||||
|
string = msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getString()
|
||||||
|
{
|
||||||
|
return string;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class StringHolderSubtype extends StringHolder
|
||||||
|
{
|
||||||
|
public StringHolderSubtype(String msg)
|
||||||
|
{
|
||||||
|
super(msg + "|subtype");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MyEncoder implements Encoder.Text<StringHolder>
|
||||||
|
{
|
||||||
|
public CountDownLatch initialized = new CountDownLatch(1);
|
||||||
|
public CountDownLatch destroyed = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(EndpointConfig config)
|
||||||
|
{
|
||||||
|
initialized.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String encode(StringHolder message)
|
||||||
|
{
|
||||||
|
return message.getString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy()
|
||||||
|
{
|
||||||
|
destroyed.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TextMessageEndpoint extends Endpoint implements MessageHandler.Whole<String>
|
||||||
|
{
|
||||||
|
public BlockingArrayQueue<String> textMessages = new BlockingArrayQueue<>();
|
||||||
|
public CountDownLatch openLatch = new CountDownLatch(1);
|
||||||
|
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||||
|
public CloseReason closeReason = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onOpen(Session session, EndpointConfig config)
|
||||||
|
{
|
||||||
|
session.addMessageHandler(this);
|
||||||
|
this.openLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, CloseReason closeReason)
|
||||||
|
{
|
||||||
|
this.closeReason = closeReason;
|
||||||
|
this.closeLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message)
|
||||||
|
{
|
||||||
|
this.textMessages.add(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(classes = {StringHolder.class, StringHolderSubtype.class})
|
||||||
|
public void testEncoderLifeCycle(Class<? extends StringHolder> clazz) throws Exception
|
||||||
|
{
|
||||||
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
|
||||||
|
TextMessageEndpoint clientEndpoint = new TextMessageEndpoint();
|
||||||
|
ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
|
||||||
|
.encoders(Collections.singletonList(MyEncoder.class))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Send an instance of our StringHolder type.
|
||||||
|
Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri);
|
||||||
|
StringHolder data = clazz.getConstructor(String.class).newInstance("test1");
|
||||||
|
session.getBasicRemote().sendObject(data);
|
||||||
|
|
||||||
|
// We received the expected echo.
|
||||||
|
String echoed = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS);
|
||||||
|
assertThat("Echoed message", echoed, is(data.getString()));
|
||||||
|
|
||||||
|
// Verify that the encoder has been opened.
|
||||||
|
EncoderFactory encoderFactory = ((JsrSession)session).getEncoderFactory();
|
||||||
|
Object obj = encoderFactory.getEncoderFor(data.getClass());
|
||||||
|
assertThat(obj.getClass(), is(MyEncoder.class));
|
||||||
|
MyEncoder encoder = (MyEncoder)obj;
|
||||||
|
assertThat(encoder.initialized.getCount(), is(0L));
|
||||||
|
|
||||||
|
// Verify the Encoder has not been destroyed, but is destroyed after the session is closed.
|
||||||
|
assertThat(encoder.destroyed.getCount(), is(1L));
|
||||||
|
session.close();
|
||||||
|
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(encoder.destroyed.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue