Merge pull request #2658 from eclipse/jetty-9.4.x-issue-2655-wsclient-session-removal

Issue #2655 - Removing closed WebSocket Session's from WebSocketClient
This commit is contained in:
Joakim Erdfelt 2018-06-22 10:15:48 -05:00 committed by GitHub
commit 6b335877d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 168 additions and 109 deletions

View File

@ -247,7 +247,7 @@ public class DelayedStartClientOnServerTest
assertThat("Response", response, startsWith("Connected to ws://"));
List<String> threadNames = getThreadNames(server);
assertNoHttpClientPoolThreads(threadNames);
assertThat("Threads", threadNames, hasItem(containsString("WebSocketContainer@")));
assertThat("Threads", threadNames, hasItem(containsString("WebSocketClient@")));
}
finally
{
@ -255,7 +255,7 @@ public class DelayedStartClientOnServerTest
}
}
@Test
@Test(timeout = 5000)
public void testHttpClientThreads_AfterServerConnectTo() throws Exception
{
Server server = new Server(0);

View File

@ -144,7 +144,12 @@ public class WebSocketPolicy
public WebSocketPolicy clonePolicy()
{
WebSocketPolicy clone = new WebSocketPolicy(this.behavior);
return clonePolicy(this.behavior);
}
public WebSocketPolicy clonePolicy(WebSocketBehavior behavior)
{
WebSocketPolicy clone = new WebSocketPolicy(behavior);
clone.idleTimeout = this.idleTimeout;
clone.maxTextMessageSize = this.maxTextMessageSize;
clone.maxTextMessageBufferSize = this.maxTextMessageBufferSize;

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.websocket.client;
import java.util.concurrent.Executor;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -31,11 +33,13 @@ class DefaultHttpClientProvider
{
SslContextFactory sslContextFactory = null;
Executor executor = null;
ByteBufferPool bufferPool = null;
if (scope != null)
{
sslContextFactory = scope.getSslContextFactory();
executor = scope.getExecutor();
bufferPool = scope.getBufferPool();
}
if (sslContextFactory == null)
@ -53,6 +57,13 @@ class DefaultHttpClientProvider
executor = threadPool;
}
client.setExecutor(executor);
if (bufferPool == null)
{
bufferPool = new MappedByteBufferPool();
}
client.setByteBufferPool(bufferPool);
return client;
}
}

View File

@ -29,11 +29,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -56,7 +55,6 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.scopes.DelegatedContainerScope;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -70,14 +68,15 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
// From HttpClient
private final HttpClient httpClient;
//
private final WebSocketContainerScope containerScope;
// CDI layer
private final Supplier<DecoratedObjectFactory> objectFactorySupplier;
// WebSocket Specifics
private final WebSocketPolicy policy;
private final WebSocketExtensionFactory extensionRegistry;
private final EventDriverFactory eventDriverFactory;
private final SessionFactory sessionFactory;
private final int id = ThreadLocalRandom.current().nextInt();
// defaults to true for backwards compatibility
private boolean stopAtShutdown = true;
@ -86,9 +85,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient()
{
// Create synthetic HttpClient
this(HttpClientProvider.get(null));
addBean(this.httpClient);
this((HttpClient)null);
}
/**
@ -99,7 +96,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(HttpClient httpClient)
{
this(httpClient,new DecoratedObjectFactory());
this(httpClient, null);
}
/**
@ -112,11 +109,20 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(HttpClient httpClient, DecoratedObjectFactory objectFactory)
{
this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(),new MappedByteBufferPool(),objectFactory);
this.httpClient = httpClient;
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this(new SimpleContainerScope(new WebSocketPolicy(WebSocketBehavior.CLIENT), null, null, null, objectFactory), null, null, httpClient);
}
/**
* Create a new WebSocketClient
*
* @param sslContextFactory
* ssl context factory to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/
@Deprecated
public WebSocketClient(SslContextFactory sslContextFactory)
{
this(sslContextFactory,null, null);
}
/**
@ -126,10 +132,9 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
* the executor to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/
@Deprecated
public WebSocketClient(Executor executor)
{
this(null,executor);
this(null, executor, null);
}
/**
@ -137,21 +142,12 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*
* @param bufferPool
* byte buffer pool to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/
@Deprecated
public WebSocketClient(ByteBufferPool bufferPool)
{
this(null,null,bufferPool);
}
/**
* Create a new WebSocketClient
*
* @param sslContextFactory
* ssl context factory to use
*/
public WebSocketClient(SslContextFactory sslContextFactory)
{
this(sslContextFactory,null);
this(null, null, bufferPool);
}
/**
@ -166,7 +162,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
@Deprecated
public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
{
this(sslContextFactory,executor,new MappedByteBufferPool());
this(sslContextFactory, executor, null);
}
/**
@ -178,7 +174,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(WebSocketContainerScope scope)
{
this(scope.getSslContextFactory(),scope.getExecutor(),scope.getBufferPool(),scope.getObjectFactory());
this(scope, null, null, null);
}
/**
@ -193,7 +189,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(WebSocketContainerScope scope, SslContextFactory sslContextFactory)
{
this(sslContextFactory,scope.getExecutor(),scope.getBufferPool(),scope.getObjectFactory());
this(sslContextFactory, scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
}
/**
@ -209,7 +205,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
{
this(sslContextFactory,executor,bufferPool,new DecoratedObjectFactory());
this(sslContextFactory, executor, bufferPool, null);
}
/**
@ -227,17 +223,8 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
private WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
{
this.httpClient = new HttpClient(sslContextFactory);
this.httpClient.setExecutor(executor);
this.httpClient.setByteBufferPool(bufferPool);
this(new SimpleContainerScope(new WebSocketPolicy(WebSocketBehavior.CLIENT), bufferPool, executor, sslContextFactory, objectFactory));
addBean(this.httpClient);
this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(), bufferPool, objectFactory);
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
}
/**
@ -271,20 +258,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory eventDriverFactory, SessionFactory sessionFactory, HttpClient httpClient)
{
WebSocketContainerScope clientScope;
if (scope.getPolicy().getBehavior() == WebSocketBehavior.CLIENT)
{
clientScope = scope;
}
else
{
// We need to wrap the scope
clientScope = new DelegatedContainerScope(WebSocketPolicy.newClientPolicy(), scope);
}
this.containerScope = clientScope;
if(httpClient == null)
if (httpClient == null)
{
this.httpClient = HttpClientProvider.get(scope);
addBean(this.httpClient);
@ -294,10 +268,14 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
this.httpClient = httpClient;
}
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
// Ensure we get a Client version of the policy.
this.policy = scope.getPolicy().clonePolicy(WebSocketBehavior.CLIENT);
// Support Late Binding of Object Factory (for CDI)
this.objectFactorySupplier = () -> scope.getObjectFactory();
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.eventDriverFactory = eventDriverFactory;
this.sessionFactory = sessionFactory;
this.eventDriverFactory = eventDriverFactory == null ? new EventDriverFactory(this) : eventDriverFactory;
this.sessionFactory = sessionFactory == null ? new WebSocketSessionFactory(this) : sessionFactory;
}
public Future<Session> connect(Object websocket, URI toUri) throws IOException
@ -439,7 +417,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public long getAsyncWriteTimeout()
{
return this.containerScope.getPolicy().getAsyncWriteTimeout();
return getPolicy().getAsyncWriteTimeout();
}
public SocketAddress getBindAddress()
@ -548,7 +526,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
@Override
public DecoratedObjectFactory getObjectFactory()
{
return this.containerScope.getObjectFactory();
return this.objectFactorySupplier.get();
}
public Set<WebSocketSession> getOpenSessions()
@ -559,7 +537,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
@Override
public WebSocketPolicy getPolicy()
{
return this.containerScope.getPolicy();
return this.policy;
}
public Scheduler getScheduler()
@ -753,11 +731,27 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
return stopAtShutdown;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (!(o instanceof WebSocketClient)) return false;
WebSocketClient that = (WebSocketClient) o;
return Objects.equals(this.httpClient, that.httpClient) &&
Objects.equals(this.policy, that.policy);
}
@Override
public int hashCode()
{
return Objects.hash(httpClient, policy);
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder("WebSocketClient@");
sb.append(Integer.toHexString(id));
sb.append(Integer.toHexString(hashCode()));
sb.append("[httpClient=").append(httpClient);
sb.append(",openSessions.size=");
sb.append(getOpenSessions().size());

View File

@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketTimeoutException;
@ -73,15 +83,6 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
public class ClientCloseTest
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
@ -351,6 +352,8 @@ public class ClientCloseTest
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("From Server"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Ignore("Need sbordet's help here")
@ -452,6 +455,7 @@ public class ClientCloseTest
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Test
@ -495,6 +499,7 @@ public class ClientCloseTest
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Test
@ -533,10 +538,17 @@ public class ClientCloseTest
// client idle timeout triggers close event on client ws-endpoint
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));
// client close should occur
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL),
anyOf(
containsString("Timeout"),
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
@Ignore("Issue #2625")
@Test(timeout = 5000L)
public void testStopLifecycle() throws Exception
{
@ -585,6 +597,7 @@ public class ClientCloseTest
{
clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.SHUTDOWN), containsString("Shutdown"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
finally
{
@ -638,5 +651,6 @@ public class ClientCloseTest
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("EOF"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
}

View File

@ -23,9 +23,11 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.WebSocketSession;
@ -54,6 +56,11 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
}
public SimpleContainerScope(WebSocketPolicy policy, ByteBufferPool bufferPool, Executor executor, DecoratedObjectFactory objectFactory)
{
this(policy, bufferPool, executor, null, objectFactory);
}
public SimpleContainerScope(WebSocketPolicy policy, ByteBufferPool bufferPool, Executor executor, SslContextFactory ssl, DecoratedObjectFactory objectFactory)
{
this.policy = policy;
this.bufferPool = bufferPool;
@ -61,16 +68,34 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
if (objectFactory == null)
{
this.objectFactory = new DecoratedObjectFactory();
this.objectFactory.addDecorator(new DeprecationWarning());
}
else
{
this.objectFactory = objectFactory;
}
if(ssl == null)
{
this.sslContextFactory = new SslContextFactory();
}
else
{
this.sslContextFactory = ssl;
}
if (executor == null)
{
QueuedThreadPool threadPool = new QueuedThreadPool();
String name = "WebSocketContainer@" + hashCode();
String behavior = "Container";
if (policy != null)
{
if (policy.getBehavior() == WebSocketBehavior.CLIENT)
behavior = "Client";
else if (policy.getBehavior() == WebSocketBehavior.SERVER)
behavior = "Server";
}
String name = String.format("WebSocket%s@%s", behavior, hashCode());
threadPool.setName(name);
threadPool.setDaemon(true);
this.executor = threadPool;
@ -82,18 +107,6 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
}
}
@Override
protected void doStart() throws Exception
{
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
}
@Override
public ByteBufferPool getBufferPool()
{

View File

@ -22,7 +22,8 @@ import java.util.Set;
import javax.servlet.ServletContainerInitializer;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import org.eclipse.jetty.server.handler.ContextHandler;
public class NativeWebSocketServletContainerInitializer implements ServletContainerInitializer
{
@ -33,14 +34,23 @@ public class NativeWebSocketServletContainerInitializer implements ServletContai
NativeWebSocketConfiguration configuration = (NativeWebSocketConfiguration) context.getAttribute(KEY);
if (configuration == null)
{
// Not provided to us, create a new default one.
configuration = new NativeWebSocketConfiguration(context);
context.setAttribute(KEY, configuration);
// Attach default configuration to context lifecycle
if (context instanceof ContextHandler.Context)
{
ContextHandler handler = ((ContextHandler.Context)context).getContextHandler();
// Let ContextHandler handle configuration lifecycle
handler.addManaged(configuration);
}
}
return configuration;
}
@Override
public void onStartup(Set<Class<?>> c, ServletContext ctx) throws ServletException
public void onStartup(Set<Class<?>> c, ServletContext ctx)
{
// initialize
getDefaultFrom(ctx);

View File

@ -315,7 +315,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{
if(this.objectFactory == null)
{
this.objectFactory = findDecorator();
this.objectFactory = findDecoratedObjectFactory();
}
if(this.executor == null)
@ -323,9 +323,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.executor = findExecutor();
}
Objects.requireNonNull(this.objectFactory, DecoratedObjectFactory.class.getName());
Objects.requireNonNull(this.executor, Executor.class.getName());
super.doStart();
}
@ -333,7 +330,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
* Attempt to find the DecoratedObjectFactory that should be used.
* @return the DecoratedObjectFactory that should be used. (never null)
*/
protected DecoratedObjectFactory findDecorator()
private DecoratedObjectFactory findDecoratedObjectFactory()
{
DecoratedObjectFactory objectFactory;
@ -356,30 +353,45 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
* Attempt to find the Executor that should be used.
* @return the Executor that should be used. (never null)
*/
protected Executor findExecutor()
private Executor findExecutor()
{
if(context != null)
// Try as bean
Executor executor = getBean(Executor.class);
if (executor != null)
{
return executor;
}
// Attempt to pull Executor from ServletContext attribute
Executor contextExecutor = (Executor) context.getAttribute("org.eclipse.jetty.server.Executor");
if(contextExecutor != null)
if (context != null)
{
// Try websocket specific one first
Executor contextExecutor = (Executor) context.getAttribute("org.eclipse.jetty.websocket.Executor");
if (contextExecutor != null)
{
return contextExecutor;
}
// Attempt to pull Executor from Jetty Server, via ContextHandler
// Try ContextHandler version
contextExecutor = (Executor) context.getAttribute("org.eclipse.jetty.server.Executor");
if (contextExecutor != null)
{
return contextExecutor;
}
// Try Executor from Jetty Server
ContextHandler contextHandler = ContextHandler.getContextHandler(context);
if (contextHandler != null)
{
contextExecutor = contextHandler.getServer().getThreadPool();
if(contextExecutor != null)
if (contextExecutor != null) // This should always be true!
{
return contextExecutor;
}
}
}
// Create a new one
// All else fails, Create a new one
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("WebSocketServerFactory");
addManaged(threadPool);