Issue #2655 - Removing closed WebSocket Session's from WebSocketClient

+ Correcting Native WebSocketConfiguration impact.
+ CDI requires a customized DecoratedObjectFactory, which is bound
  later in the lifecycle, which means we cannot rely on it being
  provided directly in the constructors, but rather have to look
  for it in the ServletContext.

Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Joakim Erdfelt 2018-06-12 15:41:08 -05:00
parent 395f4394b6
commit 870c87f2f7
7 changed files with 98 additions and 68 deletions

View File

@ -255,7 +255,7 @@ public class DelayedStartClientOnServerTest
}
}
@Test
@Test(timeout = 5000)
public void testHttpClientThreads_AfterServerConnectTo() throws Exception
{
Server server = new Server(0);

View File

@ -144,7 +144,12 @@ public class WebSocketPolicy
public WebSocketPolicy clonePolicy()
{
WebSocketPolicy clone = new WebSocketPolicy(this.behavior);
return clonePolicy(this.behavior);
}
public WebSocketPolicy clonePolicy(WebSocketBehavior behavior)
{
WebSocketPolicy clone = new WebSocketPolicy(behavior);
clone.idleTimeout = this.idleTimeout;
clone.maxTextMessageSize = this.maxTextMessageSize;
clone.maxTextMessageBufferSize = this.maxTextMessageBufferSize;

View File

@ -30,11 +30,13 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
@ -56,8 +58,6 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.scopes.DelegatedContainerScope;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
/**
@ -70,12 +70,16 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
// From HttpClient
private final HttpClient httpClient;
//
private final WebSocketContainerScope containerScope;
// CDI layer
private final Supplier<DecoratedObjectFactory> objectFactorySupplier;
// WebSocket Specifics
private final WebSocketPolicy policy;
private final WebSocketExtensionFactory extensionRegistry;
private final EventDriverFactory eventDriverFactory;
private final SessionFactory sessionFactory;
// ID Generator
private final int id = ThreadLocalRandom.current().nextInt();
// defaults to true for backwards compatibility
@ -112,11 +116,13 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(HttpClient httpClient, DecoratedObjectFactory objectFactory)
{
this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(),new MappedByteBufferPool(),objectFactory);
this.httpClient = httpClient;
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this.httpClient = Objects.requireNonNull(httpClient, "HttpClient");
this.policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
final DecoratedObjectFactory decoratedObjectFactory = objectFactory != null ? objectFactory : newDecoratedObjectFactory();
this.objectFactorySupplier = () -> decoratedObjectFactory;
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.eventDriverFactory = new EventDriverFactory(this);
this.sessionFactory = new WebSocketSessionFactory(this);
}
/**
@ -232,12 +238,14 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
this.httpClient.setByteBufferPool(bufferPool);
addBean(this.httpClient);
this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(), bufferPool, objectFactory);
this.policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
final DecoratedObjectFactory decoratedObjectFactory = objectFactory != null ? objectFactory : newDecoratedObjectFactory();
this.objectFactorySupplier = ()-> decoratedObjectFactory;
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(this);
this.sessionFactory = new WebSocketSessionFactory(this);
}
/**
@ -271,19 +279,6 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory eventDriverFactory, SessionFactory sessionFactory, HttpClient httpClient)
{
WebSocketContainerScope clientScope;
if (scope.getPolicy().getBehavior() == WebSocketBehavior.CLIENT)
{
clientScope = scope;
}
else
{
// We need to wrap the scope
clientScope = new DelegatedContainerScope(WebSocketPolicy.newClientPolicy(), scope);
}
this.containerScope = clientScope;
if(httpClient == null)
{
this.httpClient = HttpClientProvider.get(scope);
@ -294,12 +289,22 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
this.httpClient = httpClient;
}
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.policy = scope.getPolicy().clonePolicy(WebSocketBehavior.CLIENT);
// Support Late Binding of Object Factory (for CDI)
this.objectFactorySupplier = () -> scope.getObjectFactory();
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.eventDriverFactory = eventDriverFactory;
this.sessionFactory = sessionFactory;
}
private DecoratedObjectFactory newDecoratedObjectFactory()
{
DecoratedObjectFactory objectFactory = new DecoratedObjectFactory();
objectFactory.addDecorator(new DeprecationWarning());
return objectFactory;
}
public Future<Session> connect(Object websocket, URI toUri) throws IOException
{
ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
@ -439,7 +444,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public long getAsyncWriteTimeout()
{
return this.containerScope.getPolicy().getAsyncWriteTimeout();
return getPolicy().getAsyncWriteTimeout();
}
public SocketAddress getBindAddress()
@ -548,7 +553,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
@Override
public DecoratedObjectFactory getObjectFactory()
{
return this.containerScope.getObjectFactory();
return this.objectFactorySupplier.get();
}
public Set<WebSocketSession> getOpenSessions()
@ -559,7 +564,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
@Override
public WebSocketPolicy getPolicy()
{
return this.containerScope.getPolicy();
return this.policy;
}
public Scheduler getScheduler()

View File

@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketTimeoutException;
@ -73,15 +83,6 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
public class ClientCloseTest
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
@ -351,6 +352,8 @@ public class ClientCloseTest
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("From Server"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Ignore("Need sbordet's help here")
@ -452,6 +455,7 @@ public class ClientCloseTest
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Test
@ -495,6 +499,7 @@ public class ClientCloseTest
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Test
@ -533,10 +538,17 @@ public class ClientCloseTest
// client idle timeout triggers close event on client ws-endpoint
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));
// client close should occur
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL),
anyOf(
containsString("Timeout"),
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Ignore("Issue #2625")
@Test(timeout = 5000L)
public void testStopLifecycle() throws Exception
{
@ -585,6 +597,7 @@ public class ClientCloseTest
{
clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.SHUTDOWN), containsString("Shutdown"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
finally
{
@ -638,5 +651,6 @@ public class ClientCloseTest
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("EOF"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
}

View File

@ -82,18 +82,6 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
}
}
@Override
protected void doStart() throws Exception
{
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
}
@Override
public ByteBufferPool getBufferPool()
{

View File

@ -22,7 +22,8 @@ import java.util.Set;
import javax.servlet.ServletContainerInitializer;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import org.eclipse.jetty.server.handler.ContextHandler;
public class NativeWebSocketServletContainerInitializer implements ServletContainerInitializer
{
@ -33,14 +34,23 @@ public class NativeWebSocketServletContainerInitializer implements ServletContai
NativeWebSocketConfiguration configuration = (NativeWebSocketConfiguration) context.getAttribute(KEY);
if (configuration == null)
{
// Not provided to us, create a new default one.
configuration = new NativeWebSocketConfiguration(context);
context.setAttribute(KEY, configuration);
// Attach default configuration to context lifecycle
if (context instanceof ContextHandler.Context)
{
ContextHandler handler = ((ContextHandler.Context)context).getContextHandler();
// Let ContextHandler handle configuration lifecycle
handler.addBean(configuration);
}
}
return configuration;
}
@Override
public void onStartup(Set<Class<?>> c, ServletContext ctx) throws ServletException
public void onStartup(Set<Class<?>> c, ServletContext ctx)
{
// initialize
getDefaultFrom(ctx);

View File

@ -315,7 +315,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{
if(this.objectFactory == null)
{
this.objectFactory = findDecorator();
this.objectFactory = findDecoratedObjectFactory();
}
if(this.executor == null)
@ -323,9 +323,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.executor = findExecutor();
}
Objects.requireNonNull(this.objectFactory, DecoratedObjectFactory.class.getName());
Objects.requireNonNull(this.executor, Executor.class.getName());
super.doStart();
}
@ -333,7 +330,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
* Attempt to find the DecoratedObjectFactory that should be used.
* @return the DecoratedObjectFactory that should be used. (never null)
*/
protected DecoratedObjectFactory findDecorator()
private DecoratedObjectFactory findDecoratedObjectFactory()
{
DecoratedObjectFactory objectFactory;
@ -356,18 +353,27 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
* Attempt to find the Executor that should be used.
* @return the Executor that should be used. (never null)
*/
protected Executor findExecutor()
private Executor findExecutor()
{
if(context != null)
{
// Attempt to pull Executor from ServletContext attribute
Executor contextExecutor = (Executor) context.getAttribute("org.eclipse.jetty.server.Executor");
// Try websocket specific one first
Executor contextExecutor = (Executor) context.getAttribute("org.eclipse.jetty.websocket.Executor");
if(contextExecutor != null)
{
return contextExecutor;
}
// Attempt to pull Executor from Jetty Server, via ContextHandler
// Try ContextHandler version
contextExecutor = (Executor) context.getAttribute("org.eclipse.jetty.server.Executor");
if(contextExecutor != null)
{
return contextExecutor;
}
// Try Executor from Jetty Server
ContextHandler contextHandler = ContextHandler.getContextHandler(context);
if (contextHandler != null)
{
@ -379,7 +385,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
}
}
// Create a new one
// All else fails, Create a new one
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("WebSocketServerFactory");
addManaged(threadPool);
@ -408,6 +414,8 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
@Override
public DecoratedObjectFactory getObjectFactory()
{
if(!isStarted())
throw new IllegalStateException("WebSocketServerFactory not started yet");
return objectFactory;
}