Fixes #10547 - Cannot customize Executor on WebSocketClient. (#10589)

The HttpClient and WebSocketComponents will now try to share as many components as possible.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-09-28 11:32:53 +02:00 committed by GitHub
parent e76ce45ad7
commit 07f320ab6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 94 deletions

View File

@ -36,7 +36,7 @@ public class WebSocketCoreClient extends ContainerLifeCycle
public static final String WEBSOCKET_CORECLIENT_ATTRIBUTE = WebSocketCoreClient.class.getName(); public static final String WEBSOCKET_CORECLIENT_ATTRIBUTE = WebSocketCoreClient.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(WebSocketCoreClient.class); private static final Logger LOG = LoggerFactory.getLogger(WebSocketCoreClient.class);
private final HttpClient httpClient; private final HttpClient client;
private final WebSocketComponents components; private final WebSocketComponents components;
private ClassLoader classLoader; private ClassLoader classLoader;
@ -52,16 +52,25 @@ public class WebSocketCoreClient extends ContainerLifeCycle
public WebSocketCoreClient(HttpClient httpClient, WebSocketComponents webSocketComponents) public WebSocketCoreClient(HttpClient httpClient, WebSocketComponents webSocketComponents)
{ {
if (httpClient == null) client = Objects.requireNonNullElse(httpClient, HttpClientProvider.get());
httpClient = Objects.requireNonNull(HttpClientProvider.get()); addBean(client);
if (httpClient.getExecutor() == null) if (webSocketComponents == null)
httpClient.setExecutor(webSocketComponents.getExecutor()); {
if (client.isStarted())
this.classLoader = Thread.currentThread().getContextClassLoader(); webSocketComponents = new WebSocketComponents(null, null, client.getByteBufferPool(), null, null, client.getExecutor());
this.httpClient = httpClient; else
this.components = webSocketComponents; webSocketComponents = new WebSocketComponents();
addBean(httpClient); }
addBean(webSocketComponents); components = webSocketComponents;
addBean(components);
if (!client.isStarted())
{
if (client.getByteBufferPool() == null)
client.setByteBufferPool(components.getBufferPool());
if (client.getExecutor() == null)
client.setExecutor(components.getExecutor());
}
classLoader = Thread.currentThread().getContextClassLoader();
} }
public ClassLoader getClassLoader() public ClassLoader getClassLoader()
@ -112,7 +121,7 @@ public class WebSocketCoreClient extends ContainerLifeCycle
public HttpClient getHttpClient() public HttpClient getHttpClient()
{ {
return httpClient; return client;
} }
public DecoratedObjectFactory getObjectFactory() public DecoratedObjectFactory getObjectFactory()

View File

@ -49,7 +49,6 @@ import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.common.SessionTracker; import org.eclipse.jetty.websocket.common.SessionTracker;
import org.eclipse.jetty.websocket.core.Configuration; import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.client.UpgradeListener; import org.eclipse.jetty.websocket.core.client.UpgradeListener;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -64,12 +63,11 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
private final List<WebSocketSessionListener> sessionListeners = new CopyOnWriteArrayList<>(); private final List<WebSocketSessionListener> sessionListeners = new CopyOnWriteArrayList<>();
private final SessionTracker sessionTracker = new SessionTracker(); private final SessionTracker sessionTracker = new SessionTracker();
private final Configuration.ConfigurationCustomizer configurationCustomizer = new Configuration.ConfigurationCustomizer(); private final Configuration.ConfigurationCustomizer configurationCustomizer = new Configuration.ConfigurationCustomizer();
private final WebSocketComponents components;
private boolean stopAtShutdown = false; private boolean stopAtShutdown = false;
private long _stopTimeout = Long.MAX_VALUE; private long _stopTimeout = Long.MAX_VALUE;
/** /**
* Instantiate a WebSocketClient with defaults * Instantiates a WebSocketClient with a default {@link HttpClient}.
*/ */
public WebSocketClient() public WebSocketClient()
{ {
@ -77,48 +75,15 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
} }
/** /**
* <p> * <p>Instantiates a WebSocketClient with the given {@link HttpClient}.</p>
* Instantiate a WebSocketClient.
* </p>
* *
* <p> * @param httpClient the HttpClient to use
* HTTP behaviors of the WebSocket upgrade are taken from the HttpClient configuration.
* </p>
*
* @param httpClient the HttpClient to base internal defaults off of
*/ */
public WebSocketClient(HttpClient httpClient) public WebSocketClient(HttpClient httpClient)
{ {
this (httpClient, coreClient = new WebSocketCoreClient(httpClient, null);
new WebSocketComponents(
null,
null,
httpClient != null ? httpClient.getByteBufferPool() : null,
null,
null,
httpClient != null ? httpClient.getExecutor() : null)
);
}
/**
* <p>
* Instantiate a WebSocketClient.
* </p>
*
* <p>
* HTTP behaviors of the WebSocket upgrade are taken from the {@link HttpClient} configuration.
* WebSocket behaviors are taken from the {@link WebSocketComponents} configuration.
* </p>
*
* @param httpClient the HttpClient to use for the HTTP behaviors of WebSocket upgrade
* @param webSocketComponents the WebSocketComponents to use for WebSocket behaviors
*/
public WebSocketClient(HttpClient httpClient, WebSocketComponents webSocketComponents)
{
components = webSocketComponents;
coreClient = new WebSocketCoreClient(httpClient, components);
addManaged(coreClient); addManaged(coreClient);
frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this, components); frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this, coreClient.getWebSocketComponents());
sessionListeners.add(sessionTracker); sessionListeners.add(sessionTracker);
addBean(sessionTracker); addBean(sessionTracker);
} }
@ -369,13 +334,13 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
public ByteBufferPool getBufferPool() public ByteBufferPool getBufferPool()
{ {
return components.getBufferPool(); return getHttpClient().getByteBufferPool();
} }
@Override @Override
public Executor getExecutor() public Executor getExecutor()
{ {
return components.getExecutor(); return getHttpClient().getExecutor();
} }
public HttpClient getHttpClient() public HttpClient getHttpClient()
@ -385,7 +350,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
public DecoratedObjectFactory getObjectFactory() public DecoratedObjectFactory getObjectFactory()
{ {
return components.getObjectFactory(); return coreClient.getObjectFactory();
} }
public Collection<Session> getOpenSessions() public Collection<Session> getOpenSessions()
@ -403,6 +368,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
/** /**
* Set JVM shutdown behavior. * Set JVM shutdown behavior.
*
* @param stop If true, this client instance will be explicitly stopped when the * @param stop If true, this client instance will be explicitly stopped when the
* JVM is shutdown. Otherwise the application is responsible for maintaining the WebSocketClient lifecycle. * JVM is shutdown. Otherwise the application is responsible for maintaining the WebSocketClient lifecycle.
* @see Runtime#addShutdownHook(Thread) * @see Runtime#addShutdownHook(Thread)
@ -423,6 +389,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
/** /**
* The timeout to allow all remaining open Sessions to be closed gracefully using the close code {@link org.eclipse.jetty.websocket.api.StatusCode#SHUTDOWN}. * The timeout to allow all remaining open Sessions to be closed gracefully using the close code {@link org.eclipse.jetty.websocket.api.StatusCode#SHUTDOWN}.
*
* @param stopTimeout the time in ms to wait for the graceful close, use a value less than or equal to 0 to not gracefully close. * @param stopTimeout the time in ms to wait for the graceful close, use a value less than or equal to 0 to not gracefully close.
*/ */
public void setStopTimeout(long stopTimeout) public void setStopTimeout(long stopTimeout)

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.util.WSURI; import org.eclipse.jetty.websocket.api.util.WSURI;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.tests.AnnoMaxMessageEndpoint; import org.eclipse.jetty.websocket.tests.AnnoMaxMessageEndpoint;
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
@ -54,6 +53,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -62,6 +63,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -120,49 +122,23 @@ public class WebSocketClientTest
server.stop(); server.stop();
} }
@Test @ParameterizedTest
public void testCustomizeExecutorDirectly() throws Exception @ValueSource(booleans = {false, true})
public void testCustomizeExecutorDirectly(boolean startHttpClient) throws Exception
{ {
Executor executor = Executors.newFixedThreadPool(50); Executor executor = Executors.newFixedThreadPool(50);
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();
httpClient.setExecutor(executor); httpClient.setExecutor(executor);
try try
{ {
httpClient.start(); if (startHttpClient)
httpClient.start();
WebSocketClient webSocketClient = new WebSocketClient(httpClient); WebSocketClient webSocketClient = new WebSocketClient(httpClient);
try try
{ {
webSocketClient.start(); webSocketClient.start();
Executor inuseExecutor = webSocketClient.getExecutor(); Executor wsExecutor = webSocketClient.getExecutor();
assertSame(executor, inuseExecutor); assertSame(executor, wsExecutor);
}
finally
{
webSocketClient.stop();
}
}
finally
{
httpClient.stop();
}
}
@Test
public void testCustomizeWebSocketComponentsExecutor() throws Exception
{
HttpClient httpClient = new HttpClient();
try
{
httpClient.start();
Executor executor = Executors.newFixedThreadPool(50);
WebSocketComponents webSocketComponents = new WebSocketComponents(null, null,
null, null, null, executor);
WebSocketClient webSocketClient = new WebSocketClient(httpClient, webSocketComponents);
try
{
webSocketClient.start();
Executor inuseExecutor = webSocketClient.getExecutor();
assertSame(executor, inuseExecutor);
} }
finally finally
{ {
@ -330,6 +306,7 @@ public class WebSocketClientTest
assertThat("Message", received, containsString("Hello World")); assertThat("Message", received, containsString("Hello World"));
ByteBuffer bufReceived = cliSock.binaryMessageQueue.poll(5, TimeUnit.SECONDS); ByteBuffer bufReceived = cliSock.binaryMessageQueue.poll(5, TimeUnit.SECONDS);
assertNotNull(bufReceived);
received = BufferUtil.toUTF8String(bufReceived.slice()); received = BufferUtil.toUTF8String(bufReceived.slice());
assertThat("Message", received, containsString(parts[0] + parts[1] + parts[2])); assertThat("Message", received, containsString(parts[0] + parts[1] + parts[2]));
} }
@ -404,7 +381,7 @@ public class WebSocketClientTest
request.setSubProtocols("echo"); request.setSubProtocols("echo");
Future<Session> future = client.connect(cliSock, wsUri, request); Future<Session> future = client.connect(cliSock, wsUri, request);
try (Session sess = future.get(5, TimeUnit.SECONDS)) try (Session ignored = future.get(5, TimeUnit.SECONDS))
{ {
Assertions.assertTrue(cliSock.openLatch.await(1, TimeUnit.SECONDS)); Assertions.assertTrue(cliSock.openLatch.await(1, TimeUnit.SECONDS));
@ -425,7 +402,7 @@ public class WebSocketClientTest
} }
/** /**
* Ensure that <code>@WebSocket(maxTextMessageSize = 100*1024)</code> behaves as expected. * Ensure that {@code @WebSocket(maxTextMessageSize = 100*1024)} behaves as expected.
* *
* @throws Exception on test failure * @throws Exception on test failure
*/ */
@ -456,6 +433,7 @@ public class WebSocketClientTest
// wait for message from server // wait for message from server
String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS); String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS);
assertNotNull(received);
assertThat("Message", received.length(), is(size)); assertThat("Message", received.length(), is(size));
} }
} }
@ -480,9 +458,9 @@ public class WebSocketClientTest
Map<String, List<String>> parameterMap = req.getParameterMap(); Map<String, List<String>> parameterMap = req.getParameterMap();
assertThat("Parameter Map", parameterMap, notNullValue()); assertThat("Parameter Map", parameterMap, notNullValue());
assertThat("Parameter[snack]", parameterMap.get("snack"), is(Arrays.asList(new String[]{"cashews"}))); assertThat("Parameter[snack]", parameterMap.get("snack"), is(List.of("cashews")));
assertThat("Parameter[amount]", parameterMap.get("amount"), is(Arrays.asList(new String[]{"handful"}))); assertThat("Parameter[amount]", parameterMap.get("amount"), is(List.of("handful")));
assertThat("Parameter[brand]", parameterMap.get("brand"), is(Arrays.asList(new String[]{"off"}))); assertThat("Parameter[brand]", parameterMap.get("brand"), is(List.of("off")));
assertThat("Parameter[cost]", parameterMap.get("cost"), nullValue()); assertThat("Parameter[cost]", parameterMap.get("cost"), nullValue());