Issue #6566 - add executor to WebSocketComponents & use for Dispatched Messages

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-08-04 10:05:50 +10:00
parent 90a72b0798
commit bbabaee8cc
6 changed files with 35 additions and 41 deletions

View File

@ -40,13 +40,6 @@ public class WebSocketCoreClient extends ContainerLifeCycle
private final WebSocketComponents components;
private ClassLoader classLoader;
// TODO: Things to consider for inclusion in this class (or removal if they can be set elsewhere, like HttpClient)
// - AsyncWrite Idle Timeout
// - Bind Address
// - SslContextFactory setup
// - Connect Timeout
// - Cookie Store
public WebSocketCoreClient()
{
this(null, new WebSocketComponents());
@ -61,6 +54,8 @@ public class WebSocketCoreClient extends ContainerLifeCycle
{
if (httpClient == null)
httpClient = Objects.requireNonNull(HttpClientProvider.get());
if (httpClient.getExecutor() == null)
httpClient.setExecutor(webSocketComponents.getExecutor());
this.classLoader = Thread.currentThread().getContextClassLoader();
this.httpClient = httpClient;

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.client.internal;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public interface HttpClientProvider
{
@ -30,11 +29,7 @@ public interface HttpClientProvider
private static HttpClient newDefaultHttpClient()
{
HttpClient client = new HttpClient();
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("WebSocketClient@" + client.hashCode());
client.setExecutor(threadPool);
return client;
return new HttpClient();
}
default HttpClient newHttpClient()

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.websocket.core;
import java.util.concurrent.Executor;
import java.util.zip.Deflater;
import org.eclipse.jetty.io.ByteBufferPool;
@ -22,6 +23,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.compression.CompressionPool;
import org.eclipse.jetty.util.compression.DeflaterPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
/**
* A collection of components which are the resources needed for websockets such as
@ -31,6 +33,7 @@ public class WebSocketComponents extends ContainerLifeCycle
{
private final DecoratedObjectFactory objectFactory;
private final WebSocketExtensionRegistry extensionRegistry;
private final Executor executor;
private final ByteBufferPool bufferPool;
private final InflaterPool inflaterPool;
private final DeflaterPool deflaterPool;
@ -42,18 +45,26 @@ public class WebSocketComponents extends ContainerLifeCycle
public WebSocketComponents(WebSocketExtensionRegistry extensionRegistry, DecoratedObjectFactory objectFactory,
ByteBufferPool bufferPool, InflaterPool inflaterPool, DeflaterPool deflaterPool)
{
this (extensionRegistry, objectFactory, bufferPool, inflaterPool, deflaterPool, null);
}
public WebSocketComponents(WebSocketExtensionRegistry extensionRegistry, DecoratedObjectFactory objectFactory,
ByteBufferPool bufferPool, InflaterPool inflaterPool, DeflaterPool deflaterPool, Executor executor)
{
this.extensionRegistry = (extensionRegistry == null) ? new WebSocketExtensionRegistry() : extensionRegistry;
this.objectFactory = (objectFactory == null) ? new DecoratedObjectFactory() : objectFactory;
this.bufferPool = (bufferPool == null) ? new MappedByteBufferPool() : bufferPool;
this.inflaterPool = (inflaterPool == null) ? new InflaterPool(CompressionPool.DEFAULT_CAPACITY, true) : inflaterPool;
this.deflaterPool = (deflaterPool == null) ? new DeflaterPool(CompressionPool.DEFAULT_CAPACITY, Deflater.DEFAULT_COMPRESSION, true) : deflaterPool;
this.executor = (executor == null) ? new QueuedThreadPool() : executor;
addBean(inflaterPool);
addBean(deflaterPool);
addBean(bufferPool);
addBean(extensionRegistry);
addBean(objectFactory);
addBean(executor);
}
public ByteBufferPool getBufferPool()
@ -61,6 +72,11 @@ public class WebSocketComponents extends ContainerLifeCycle
return bufferPool;
}
public Executor getExecutor()
{
return executor;
}
public WebSocketExtensionRegistry getExtensionRegistry()
{
return extensionRegistry;

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.websocket.core.internal.messages;
import java.io.Closeable;
import java.lang.invoke.MethodHandle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
@ -95,10 +96,12 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
{
private CompletableFuture<Void> dispatchComplete;
private MessageSink typeSink;
private final Executor executor;
public DispatchedMessageSink(CoreSession session, MethodHandle methodHandle)
{
super(session, methodHandle);
executor = session.getWebSocketComponents().getExecutor();
}
public abstract MessageSink newSink(Frame frame);
@ -112,7 +115,7 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
// Dispatch to end user function (will likely start with blocking for data/accept).
// If the MessageSink can be closed do this after invoking and before completing the CompletableFuture.
new Thread(() ->
executor.execute(() ->
{
try
{
@ -129,7 +132,7 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
dispatchComplete.completeExceptionally(throwable);
}
}).start();
});
}
Callback frameCallback = callback;

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.websocket.core.server;
import java.util.Objects;
import java.util.concurrent.Executor;
import javax.servlet.ServletContext;
import org.eclipse.jetty.io.ByteBufferPool;
@ -40,9 +41,9 @@ public class WebSocketServerComponents extends WebSocketComponents
public static final String WEBSOCKET_DEFLATER_POOL_ATTRIBUTE = "jetty.websocket.deflater";
public static final String WEBSOCKET_BUFFER_POOL_ATTRIBUTE = "jetty.websocket.bufferPool";
WebSocketServerComponents(InflaterPool inflaterPool, DeflaterPool deflaterPool, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
WebSocketServerComponents(InflaterPool inflaterPool, DeflaterPool deflaterPool, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory, Executor executor)
{
super(null, objectFactory, bufferPool, inflaterPool, deflaterPool);
super(null, objectFactory, bufferPool, inflaterPool, deflaterPool, executor);
}
/**
@ -79,8 +80,12 @@ public class WebSocketServerComponents extends WebSocketComponents
if (bufferPool == null)
bufferPool = server.getBean(ByteBufferPool.class);
Executor executor = (Executor)servletContext.getAttribute("org.eclipse.jetty.server.Executor");
if (executor == null)
executor = server.getThreadPool();
DecoratedObjectFactory objectFactory = (DecoratedObjectFactory)servletContext.getAttribute(DecoratedObjectFactory.ATTR);
WebSocketComponents serverComponents = new WebSocketServerComponents(inflaterPool, deflaterPool, bufferPool, objectFactory);
WebSocketComponents serverComponents = new WebSocketServerComponents(inflaterPool, deflaterPool, bufferPool, objectFactory, executor);
if (objectFactory != null)
serverComponents.unmanage(objectFactory);
@ -92,6 +97,8 @@ public class WebSocketServerComponents extends WebSocketComponents
serverComponents.unmanage(deflaterPool);
if (server.contains(bufferPool))
serverComponents.unmanage(bufferPool);
if (executor != null)
serverComponents.unmanage(executor);
// Stop the WebSocketComponents when the ContextHandler stops.
ContextHandler contextHandler = Objects.requireNonNull(ContextHandler.getContextHandler(servletContext));

View File

@ -73,12 +73,7 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer
if (httpClient == null)
httpClient = (HttpClient)contextHandler.getServer().getAttribute(JavaxWebSocketServletContainerInitializer.HTTPCLIENT_ATTRIBUTE);
Executor executor = httpClient == null ? null : httpClient.getExecutor();
if (executor == null)
executor = (Executor)servletContext.getAttribute("org.eclipse.jetty.server.Executor");
if (executor == null)
executor = contextHandler.getServer().getThreadPool();
Executor executor = wsComponents.getExecutor();
if (httpClient != null && httpClient.getExecutor() == null)
httpClient.setExecutor(executor);
@ -123,23 +118,6 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer
private List<Class<?>> deferredEndpointClasses;
private List<ServerEndpointConfig> deferredEndpointConfigs;
/**
* Main entry point for {@link JavaxWebSocketServletContainerInitializer}.
*
* @param webSocketMappings the {@link WebSocketMappings} that this container belongs to
*/
public JavaxWebSocketServerContainer(WebSocketMappings webSocketMappings)
{
this(webSocketMappings, new WebSocketComponents());
}
public JavaxWebSocketServerContainer(WebSocketMappings webSocketMappings, WebSocketComponents components)
{
super(components);
this.webSocketMappings = webSocketMappings;
this.frameHandlerFactory = new JavaxWebSocketServerFrameHandlerFactory(this);
}
/**
* Main entry point for {@link JavaxWebSocketServletContainerInitializer}.
*