diff --git a/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/WebSocketCoreClient.java b/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/WebSocketCoreClient.java index c8db7f1d82f..950231f0fa1 100644 --- a/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/WebSocketCoreClient.java +++ b/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/WebSocketCoreClient.java @@ -40,13 +40,6 @@ public class WebSocketCoreClient extends ContainerLifeCycle private final WebSocketComponents components; private ClassLoader classLoader; - // TODO: Things to consider for inclusion in this class (or removal if they can be set elsewhere, like HttpClient) - // - AsyncWrite Idle Timeout - // - Bind Address - // - SslContextFactory setup - // - Connect Timeout - // - Cookie Store - public WebSocketCoreClient() { this(null, new WebSocketComponents()); @@ -61,6 +54,8 @@ public class WebSocketCoreClient extends ContainerLifeCycle { if (httpClient == null) httpClient = Objects.requireNonNull(HttpClientProvider.get()); + if (httpClient.getExecutor() == null) + httpClient.setExecutor(webSocketComponents.getExecutor()); this.classLoader = Thread.currentThread().getContextClassLoader(); this.httpClient = httpClient; diff --git a/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/internal/HttpClientProvider.java b/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/internal/HttpClientProvider.java index f6b855e8525..d1ea884064e 100644 --- a/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/internal/HttpClientProvider.java +++ b/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/internal/HttpClientProvider.java @@ -14,7 +14,6 @@ package org.eclipse.jetty.websocket.core.client.internal; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.util.thread.QueuedThreadPool; public interface HttpClientProvider { @@ -30,11 +29,7 @@ public interface HttpClientProvider private static HttpClient newDefaultHttpClient() { - HttpClient client = new HttpClient(); - QueuedThreadPool threadPool = new QueuedThreadPool(); - threadPool.setName("WebSocketClient@" + client.hashCode()); - client.setExecutor(threadPool); - return client; + return new HttpClient(); } default HttpClient newHttpClient() diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketComponents.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketComponents.java index f9c6e86e742..76168da640a 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketComponents.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketComponents.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.websocket.core; +import java.util.concurrent.Executor; import java.util.zip.Deflater; import org.eclipse.jetty.io.ByteBufferPool; @@ -22,6 +23,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.compression.CompressionPool; import org.eclipse.jetty.util.compression.DeflaterPool; import org.eclipse.jetty.util.compression.InflaterPool; +import org.eclipse.jetty.util.thread.QueuedThreadPool; /** * A collection of components which are the resources needed for websockets such as @@ -31,6 +33,7 @@ public class WebSocketComponents extends ContainerLifeCycle { private final DecoratedObjectFactory objectFactory; private final WebSocketExtensionRegistry extensionRegistry; + private final Executor executor; private final ByteBufferPool bufferPool; private final InflaterPool inflaterPool; private final DeflaterPool deflaterPool; @@ -42,18 +45,26 @@ public class WebSocketComponents extends ContainerLifeCycle public WebSocketComponents(WebSocketExtensionRegistry extensionRegistry, DecoratedObjectFactory objectFactory, ByteBufferPool bufferPool, InflaterPool inflaterPool, DeflaterPool deflaterPool) + { + this (extensionRegistry, objectFactory, bufferPool, inflaterPool, deflaterPool, null); + } + + public WebSocketComponents(WebSocketExtensionRegistry extensionRegistry, DecoratedObjectFactory objectFactory, + ByteBufferPool bufferPool, InflaterPool inflaterPool, DeflaterPool deflaterPool, Executor executor) { this.extensionRegistry = (extensionRegistry == null) ? new WebSocketExtensionRegistry() : extensionRegistry; this.objectFactory = (objectFactory == null) ? new DecoratedObjectFactory() : objectFactory; this.bufferPool = (bufferPool == null) ? new MappedByteBufferPool() : bufferPool; this.inflaterPool = (inflaterPool == null) ? new InflaterPool(CompressionPool.DEFAULT_CAPACITY, true) : inflaterPool; this.deflaterPool = (deflaterPool == null) ? new DeflaterPool(CompressionPool.DEFAULT_CAPACITY, Deflater.DEFAULT_COMPRESSION, true) : deflaterPool; + this.executor = (executor == null) ? new QueuedThreadPool() : executor; addBean(inflaterPool); addBean(deflaterPool); addBean(bufferPool); addBean(extensionRegistry); addBean(objectFactory); + addBean(executor); } public ByteBufferPool getBufferPool() @@ -61,6 +72,11 @@ public class WebSocketComponents extends ContainerLifeCycle return bufferPool; } + public Executor getExecutor() + { + return executor; + } + public WebSocketExtensionRegistry getExtensionRegistry() { return extensionRegistry; diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java index 875fab0096b..57d31e0d3f6 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.websocket.core.internal.messages; import java.io.Closeable; import java.lang.invoke.MethodHandle; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; @@ -95,10 +96,12 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink { private CompletableFuture dispatchComplete; private MessageSink typeSink; + private final Executor executor; public DispatchedMessageSink(CoreSession session, MethodHandle methodHandle) { super(session, methodHandle); + executor = session.getWebSocketComponents().getExecutor(); } public abstract MessageSink newSink(Frame frame); @@ -112,7 +115,7 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink // Dispatch to end user function (will likely start with blocking for data/accept). // If the MessageSink can be closed do this after invoking and before completing the CompletableFuture. - new Thread(() -> + executor.execute(() -> { try { @@ -129,7 +132,7 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink dispatchComplete.completeExceptionally(throwable); } - }).start(); + }); } Callback frameCallback = callback; diff --git a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/WebSocketServerComponents.java b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/WebSocketServerComponents.java index 946882acda1..d5cd19562f2 100644 --- a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/WebSocketServerComponents.java +++ b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/WebSocketServerComponents.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.websocket.core.server; import java.util.Objects; +import java.util.concurrent.Executor; import javax.servlet.ServletContext; import org.eclipse.jetty.io.ByteBufferPool; @@ -40,9 +41,9 @@ public class WebSocketServerComponents extends WebSocketComponents public static final String WEBSOCKET_DEFLATER_POOL_ATTRIBUTE = "jetty.websocket.deflater"; public static final String WEBSOCKET_BUFFER_POOL_ATTRIBUTE = "jetty.websocket.bufferPool"; - WebSocketServerComponents(InflaterPool inflaterPool, DeflaterPool deflaterPool, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory) + WebSocketServerComponents(InflaterPool inflaterPool, DeflaterPool deflaterPool, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory, Executor executor) { - super(null, objectFactory, bufferPool, inflaterPool, deflaterPool); + super(null, objectFactory, bufferPool, inflaterPool, deflaterPool, executor); } /** @@ -79,8 +80,12 @@ public class WebSocketServerComponents extends WebSocketComponents if (bufferPool == null) bufferPool = server.getBean(ByteBufferPool.class); + Executor executor = (Executor)servletContext.getAttribute("org.eclipse.jetty.server.Executor"); + if (executor == null) + executor = server.getThreadPool(); + DecoratedObjectFactory objectFactory = (DecoratedObjectFactory)servletContext.getAttribute(DecoratedObjectFactory.ATTR); - WebSocketComponents serverComponents = new WebSocketServerComponents(inflaterPool, deflaterPool, bufferPool, objectFactory); + WebSocketComponents serverComponents = new WebSocketServerComponents(inflaterPool, deflaterPool, bufferPool, objectFactory, executor); if (objectFactory != null) serverComponents.unmanage(objectFactory); @@ -92,6 +97,8 @@ public class WebSocketServerComponents extends WebSocketComponents serverComponents.unmanage(deflaterPool); if (server.contains(bufferPool)) serverComponents.unmanage(bufferPool); + if (executor != null) + serverComponents.unmanage(executor); // Stop the WebSocketComponents when the ContextHandler stops. ContextHandler contextHandler = Objects.requireNonNull(ContextHandler.getContextHandler(servletContext)); diff --git a/jetty-websocket/websocket-javax-server/src/main/java/org/eclipse/jetty/websocket/javax/server/internal/JavaxWebSocketServerContainer.java b/jetty-websocket/websocket-javax-server/src/main/java/org/eclipse/jetty/websocket/javax/server/internal/JavaxWebSocketServerContainer.java index ac488c842da..6683b931244 100644 --- a/jetty-websocket/websocket-javax-server/src/main/java/org/eclipse/jetty/websocket/javax/server/internal/JavaxWebSocketServerContainer.java +++ b/jetty-websocket/websocket-javax-server/src/main/java/org/eclipse/jetty/websocket/javax/server/internal/JavaxWebSocketServerContainer.java @@ -73,12 +73,7 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer if (httpClient == null) httpClient = (HttpClient)contextHandler.getServer().getAttribute(JavaxWebSocketServletContainerInitializer.HTTPCLIENT_ATTRIBUTE); - Executor executor = httpClient == null ? null : httpClient.getExecutor(); - if (executor == null) - executor = (Executor)servletContext.getAttribute("org.eclipse.jetty.server.Executor"); - if (executor == null) - executor = contextHandler.getServer().getThreadPool(); - + Executor executor = wsComponents.getExecutor(); if (httpClient != null && httpClient.getExecutor() == null) httpClient.setExecutor(executor); @@ -123,23 +118,6 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer private List> deferredEndpointClasses; private List deferredEndpointConfigs; - /** - * Main entry point for {@link JavaxWebSocketServletContainerInitializer}. - * - * @param webSocketMappings the {@link WebSocketMappings} that this container belongs to - */ - public JavaxWebSocketServerContainer(WebSocketMappings webSocketMappings) - { - this(webSocketMappings, new WebSocketComponents()); - } - - public JavaxWebSocketServerContainer(WebSocketMappings webSocketMappings, WebSocketComponents components) - { - super(components); - this.webSocketMappings = webSocketMappings; - this.frameHandlerFactory = new JavaxWebSocketServerFrameHandlerFactory(this); - } - /** * Main entry point for {@link JavaxWebSocketServletContainerInitializer}. *