Merge pull request #6585 from eclipse/jetty-10.0.x-6566-WebSocketExecutor

Issue #6566 - add executor to WebSocketComponents & use for Dispatched Messages
This commit is contained in:
Lachlan 2021-08-05 15:38:20 +10:00 committed by GitHub
commit 8de7d55bd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 143 additions and 83 deletions

View File

@ -40,13 +40,6 @@ public class WebSocketCoreClient extends ContainerLifeCycle
private final WebSocketComponents components;
private ClassLoader classLoader;
// TODO: Things to consider for inclusion in this class (or removal if they can be set elsewhere, like HttpClient)
// - AsyncWrite Idle Timeout
// - Bind Address
// - SslContextFactory setup
// - Connect Timeout
// - Cookie Store
public WebSocketCoreClient()
{
this(null, new WebSocketComponents());
@ -61,6 +54,8 @@ public class WebSocketCoreClient extends ContainerLifeCycle
{
if (httpClient == null)
httpClient = Objects.requireNonNull(HttpClientProvider.get());
if (httpClient.getExecutor() == null)
httpClient.setExecutor(webSocketComponents.getExecutor());
this.classLoader = Thread.currentThread().getContextClassLoader();
this.httpClient = httpClient;

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.client.internal;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public interface HttpClientProvider
{
@ -30,11 +29,7 @@ public interface HttpClientProvider
private static HttpClient newDefaultHttpClient()
{
HttpClient client = new HttpClient();
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("WebSocketClient@" + client.hashCode());
client.setExecutor(threadPool);
return client;
return new HttpClient();
}
default HttpClient newHttpClient()

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.websocket.core;
import java.util.concurrent.Executor;
import java.util.zip.Deflater;
import org.eclipse.jetty.io.ByteBufferPool;
@ -22,6 +23,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.compression.CompressionPool;
import org.eclipse.jetty.util.compression.DeflaterPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
/**
* A collection of components which are the resources needed for websockets such as
@ -29,11 +31,12 @@ import org.eclipse.jetty.util.compression.InflaterPool;
*/
public class WebSocketComponents extends ContainerLifeCycle
{
private final DecoratedObjectFactory objectFactory;
private final WebSocketExtensionRegistry extensionRegistry;
private final ByteBufferPool bufferPool;
private final InflaterPool inflaterPool;
private final DeflaterPool deflaterPool;
private final DecoratedObjectFactory _objectFactory;
private final WebSocketExtensionRegistry _extensionRegistry;
private final Executor _executor;
private final ByteBufferPool _bufferPool;
private final InflaterPool _inflaterPool;
private final DeflaterPool _deflaterPool;
public WebSocketComponents()
{
@ -43,41 +46,64 @@ public class WebSocketComponents extends ContainerLifeCycle
public WebSocketComponents(WebSocketExtensionRegistry extensionRegistry, DecoratedObjectFactory objectFactory,
ByteBufferPool bufferPool, InflaterPool inflaterPool, DeflaterPool deflaterPool)
{
this.extensionRegistry = (extensionRegistry == null) ? new WebSocketExtensionRegistry() : extensionRegistry;
this.objectFactory = (objectFactory == null) ? new DecoratedObjectFactory() : objectFactory;
this.bufferPool = (bufferPool == null) ? new MappedByteBufferPool() : bufferPool;
this.inflaterPool = (inflaterPool == null) ? new InflaterPool(CompressionPool.DEFAULT_CAPACITY, true) : inflaterPool;
this.deflaterPool = (deflaterPool == null) ? new DeflaterPool(CompressionPool.DEFAULT_CAPACITY, Deflater.DEFAULT_COMPRESSION, true) : deflaterPool;
this (extensionRegistry, objectFactory, bufferPool, inflaterPool, deflaterPool, null);
}
addBean(inflaterPool);
addBean(deflaterPool);
addBean(bufferPool);
addBean(extensionRegistry);
addBean(objectFactory);
public WebSocketComponents(WebSocketExtensionRegistry extensionRegistry, DecoratedObjectFactory objectFactory,
ByteBufferPool bufferPool, InflaterPool inflaterPool, DeflaterPool deflaterPool, Executor executor)
{
_extensionRegistry = (extensionRegistry == null) ? new WebSocketExtensionRegistry() : extensionRegistry;
_objectFactory = (objectFactory == null) ? new DecoratedObjectFactory() : objectFactory;
_bufferPool = (bufferPool == null) ? new MappedByteBufferPool() : bufferPool;
_inflaterPool = (inflaterPool == null) ? new InflaterPool(CompressionPool.DEFAULT_CAPACITY, true) : inflaterPool;
_deflaterPool = (deflaterPool == null) ? new DeflaterPool(CompressionPool.DEFAULT_CAPACITY, Deflater.DEFAULT_COMPRESSION, true) : deflaterPool;
if (executor == null)
{
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("WebSocket@" + hashCode());
_executor = threadPool;
}
else
{
_executor = executor;
}
addBean(_inflaterPool);
addBean(_deflaterPool);
addBean(_bufferPool);
addBean(_extensionRegistry);
addBean(_objectFactory);
addBean(_executor);
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
return _bufferPool;
}
public Executor getExecutor()
{
return _executor;
}
public WebSocketExtensionRegistry getExtensionRegistry()
{
return extensionRegistry;
return _extensionRegistry;
}
public DecoratedObjectFactory getObjectFactory()
{
return objectFactory;
return _objectFactory;
}
public InflaterPool getInflaterPool()
{
return inflaterPool;
return _inflaterPool;
}
public DeflaterPool getDeflaterPool()
{
return deflaterPool;
return _deflaterPool;
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.websocket.core.internal.messages;
import java.io.Closeable;
import java.lang.invoke.MethodHandle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
@ -95,10 +96,12 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
{
private CompletableFuture<Void> dispatchComplete;
private MessageSink typeSink;
private final Executor executor;
public DispatchedMessageSink(CoreSession session, MethodHandle methodHandle)
{
super(session, methodHandle);
executor = session.getWebSocketComponents().getExecutor();
}
public abstract MessageSink newSink(Frame frame);
@ -112,7 +115,7 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
// Dispatch to end user function (will likely start with blocking for data/accept).
// If the MessageSink can be closed do this after invoking and before completing the CompletableFuture.
new Thread(() ->
executor.execute(() ->
{
try
{
@ -129,7 +132,7 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
dispatchComplete.completeExceptionally(throwable);
}
}).start();
});
}
Callback frameCallback = callback;

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.websocket.core.server;
import java.util.Objects;
import java.util.concurrent.Executor;
import javax.servlet.ServletContext;
import org.eclipse.jetty.io.ByteBufferPool;
@ -40,9 +41,9 @@ public class WebSocketServerComponents extends WebSocketComponents
public static final String WEBSOCKET_DEFLATER_POOL_ATTRIBUTE = "jetty.websocket.deflater";
public static final String WEBSOCKET_BUFFER_POOL_ATTRIBUTE = "jetty.websocket.bufferPool";
WebSocketServerComponents(InflaterPool inflaterPool, DeflaterPool deflaterPool, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
WebSocketServerComponents(InflaterPool inflaterPool, DeflaterPool deflaterPool, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory, Executor executor)
{
super(null, objectFactory, bufferPool, inflaterPool, deflaterPool);
super(null, objectFactory, bufferPool, inflaterPool, deflaterPool, executor);
}
/**
@ -79,8 +80,12 @@ public class WebSocketServerComponents extends WebSocketComponents
if (bufferPool == null)
bufferPool = server.getBean(ByteBufferPool.class);
Executor executor = (Executor)servletContext.getAttribute("org.eclipse.jetty.server.Executor");
if (executor == null)
executor = server.getThreadPool();
DecoratedObjectFactory objectFactory = (DecoratedObjectFactory)servletContext.getAttribute(DecoratedObjectFactory.ATTR);
WebSocketComponents serverComponents = new WebSocketServerComponents(inflaterPool, deflaterPool, bufferPool, objectFactory);
WebSocketComponents serverComponents = new WebSocketServerComponents(inflaterPool, deflaterPool, bufferPool, objectFactory, executor);
if (objectFactory != null)
serverComponents.unmanage(objectFactory);
@ -92,6 +97,8 @@ public class WebSocketServerComponents extends WebSocketComponents
serverComponents.unmanage(deflaterPool);
if (server.contains(bufferPool))
serverComponents.unmanage(bufferPool);
if (executor != null)
serverComponents.unmanage(executor);
// Stop the WebSocketComponents when the ContextHandler stops.
ContextHandler contextHandler = Objects.requireNonNull(ContextHandler.getContextHandler(servletContext));

View File

@ -28,17 +28,21 @@ import org.junit.jupiter.api.BeforeAll;
public abstract class AbstractJavaxWebSocketFrameHandlerTest
{
protected static DummyContainer container;
private static WebSocketComponents components;
@BeforeAll
public static void initContainer() throws Exception
{
container = new DummyContainer();
container.start();
components = new WebSocketComponents();
components.start();
}
@AfterAll
public static void stopContainer() throws Exception
{
components.stop();
container.stop();
}
@ -48,7 +52,6 @@ public abstract class AbstractJavaxWebSocketFrameHandlerTest
protected EndpointConfig endpointConfig;
protected CoreSession coreSession = new CoreSession.Empty()
{
private final WebSocketComponents components = new WebSocketComponents();
@Override
public WebSocketComponents getWebSocketComponents()

View File

@ -18,6 +18,7 @@ import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@ -25,16 +26,26 @@ public abstract class AbstractSessionTest
{
protected static JavaxWebSocketSession session;
protected static JavaxWebSocketContainer container;
protected static WebSocketComponents components;
@BeforeAll
public static void initSession() throws Exception
{
container = new DummyContainer();
container.start();
components = new WebSocketComponents();
components.start();
Object websocketPojo = new DummyEndpoint();
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest);
CoreSession coreSession = new CoreSession.Empty();
CoreSession coreSession = new CoreSession.Empty()
{
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
};
session = new JavaxWebSocketSession(container, coreSession, frameHandler, container.getFrameHandlerFactory()
.newDefaultEndpointConfig(websocketPojo.getClass()));
}
@ -42,6 +53,7 @@ public abstract class AbstractSessionTest
@AfterAll
public static void stopContainer() throws Exception
{
components.stop();
container.stop();
}

View File

@ -73,12 +73,7 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer
if (httpClient == null)
httpClient = (HttpClient)contextHandler.getServer().getAttribute(JavaxWebSocketServletContainerInitializer.HTTPCLIENT_ATTRIBUTE);
Executor executor = httpClient == null ? null : httpClient.getExecutor();
if (executor == null)
executor = (Executor)servletContext.getAttribute("org.eclipse.jetty.server.Executor");
if (executor == null)
executor = contextHandler.getServer().getThreadPool();
Executor executor = wsComponents.getExecutor();
if (httpClient != null && httpClient.getExecutor() == null)
httpClient.setExecutor(executor);
@ -123,23 +118,6 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer
private List<Class<?>> deferredEndpointClasses;
private List<ServerEndpointConfig> deferredEndpointConfigs;
/**
* Main entry point for {@link JavaxWebSocketServletContainerInitializer}.
*
* @param webSocketMappings the {@link WebSocketMappings} that this container belongs to
*/
public JavaxWebSocketServerContainer(WebSocketMappings webSocketMappings)
{
this(webSocketMappings, new WebSocketComponents());
}
public JavaxWebSocketServerContainer(WebSocketMappings webSocketMappings, WebSocketComponents components)
{
super(components);
this.webSocketMappings = webSocketMappings;
this.frameHandlerFactory = new JavaxWebSocketServerFrameHandlerFactory(this);
}
/**
* Main entry point for {@link JavaxWebSocketServletContainerInitializer}.
*

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.websocket.javax.tests.client;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.javax.client.internal.BasicClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.client.internal.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer;
@ -29,16 +30,26 @@ public abstract class AbstractClientSessionTest
{
protected static JavaxWebSocketSession session;
protected static JavaxWebSocketContainer container;
protected static WebSocketComponents components;
@BeforeAll
public static void initSession() throws Exception
{
container = new JavaxWebSocketClientContainer();
container.start();
components = new WebSocketComponents();
components.start();
Object websocketPojo = new DummyEndpoint();
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest);
CoreSession coreSession = new CoreSession.Empty();
CoreSession coreSession = new CoreSession.Empty()
{
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
};
session = new JavaxWebSocketSession(container, coreSession, frameHandler, new BasicClientEndpointConfig());
}

View File

@ -27,10 +27,13 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -38,6 +41,20 @@ import static org.hamcrest.Matchers.is;
public class JavaxWebSocketFrameHandlerOnMessageTextStreamTest extends AbstractJavaxWebSocketServerFrameHandlerTest
{
private static final WebSocketComponents components = new WebSocketComponents();
@BeforeAll
public static void beforeAll() throws Exception
{
components.start();
}
@AfterAll
public static void afterAll() throws Exception
{
components.stop();
}
@SuppressWarnings("Duplicates")
private <T extends WSEventTracker> T performOnMessageInvocation(T socket, Consumer<JavaxWebSocketFrameHandler> func) throws Exception
{
@ -46,7 +63,14 @@ public class JavaxWebSocketFrameHandlerOnMessageTextStreamTest extends AbstractJ
// Establish endpoint function
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request);
frameHandler.onOpen(new CoreSession.Empty(), Callback.NOOP);
frameHandler.onOpen(new CoreSession.Empty()
{
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
}, Callback.NOOP);
func.accept(frameHandler);
return socket;
}

View File

@ -85,10 +85,6 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
{
coreClient = new WebSocketCoreClient(httpClient, components);
addManaged(coreClient);
if (httpClient == null)
coreClient.getHttpClient().setName("Jetty-WebSocketClient@" + hashCode());
frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this, components);
sessionListeners.add(sessionTracker);
addBean(sessionTracker);

View File

@ -41,7 +41,7 @@ public class HttpClientInitTest
assertThat("Executor exists", executor, notNullValue());
assertThat("Executor instanceof", executor, instanceOf(QueuedThreadPool.class));
QueuedThreadPool threadPool = (QueuedThreadPool)executor;
assertThat("QueuedThreadPool.name", threadPool.getName(), startsWith("WebSocketClient@"));
assertThat("QueuedThreadPool.name", threadPool.getName(), startsWith("WebSocket@"));
}
finally
{

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
@ -46,6 +47,10 @@ public class JettyWebSocketFrameHandlerTest
{
private static DummyContainer container;
private final WebSocketComponents components;
private final JettyWebSocketFrameHandlerFactory endpointFactory;
private final CoreSession coreSession;
@BeforeAll
public static void startContainer() throws Exception
{
@ -59,22 +64,27 @@ public class JettyWebSocketFrameHandlerTest
container.stop();
}
private final WebSocketComponents components = new WebSocketComponents();
private final JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(container, components);
private final CoreSession coreSession = new CoreSession.Empty()
public JettyWebSocketFrameHandlerTest()
{
@Override
public Behavior getBehavior()
components = new WebSocketComponents();
endpointFactory = new JettyWebSocketFrameHandlerFactory(container, components);
coreSession = new CoreSession.Empty()
{
return Behavior.CLIENT;
}
@Override
public Behavior getBehavior()
{
return Behavior.CLIENT;
}
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
};
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
};
LifeCycle.start(components);
}
private JettyWebSocketFrameHandler newLocalFrameHandler(Object wsEndpoint)
{