Issue #5018 - add WebSocketClient UpgradeRequest timeout to jetty 10

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-07-08 18:04:36 +10:00
parent c716edd7a2
commit a179535db3
6 changed files with 42 additions and 32 deletions

View File

@ -23,11 +23,11 @@ import java.net.URI;
import java.security.Principal; import java.security.Principal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
@ -39,19 +39,19 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
*/ */
public final class ClientUpgradeRequest implements UpgradeRequest public final class ClientUpgradeRequest implements UpgradeRequest
{ {
private URI requestURI; private final List<String> subProtocols = new ArrayList<>(1);
private List<String> subProtocols = new ArrayList<>(1); private final List<ExtensionConfig> extensions = new ArrayList<>(1);
private List<ExtensionConfig> extensions = new ArrayList<>(1); private final List<HttpCookie> cookies = new ArrayList<>(1);
private List<HttpCookie> cookies = new ArrayList<>(1); private final Map<String, List<String>> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private Map<String, List<String>> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); private final URI requestURI;
private Map<String, List<String>> parameters = new HashMap<>(1); private final String host;
private String httpVersion; private long timeout;
private String method;
private String host;
public ClientUpgradeRequest() public ClientUpgradeRequest()
{ {
/* anonymous, no requestURI, upgrade request */ /* anonymous, no requestURI, upgrade request */
this.requestURI = null;
this.host = null;
} }
public ClientUpgradeRequest(URI uri) public ClientUpgradeRequest(URI uri)
@ -161,13 +161,13 @@ public final class ClientUpgradeRequest implements UpgradeRequest
@Override @Override
public String getHttpVersion() public String getHttpVersion()
{ {
return httpVersion; throw new UnsupportedOperationException("HttpVersion not available on ClientUpgradeRequest");
} }
@Override @Override
public String getMethod() public String getMethod()
{ {
return method; throw new UnsupportedOperationException("Method not available on ClientUpgradeRequest");
} }
@Override @Override
@ -176,15 +176,10 @@ public final class ClientUpgradeRequest implements UpgradeRequest
return getHeader(HttpHeader.ORIGIN.name()); return getHeader(HttpHeader.ORIGIN.name());
} }
/**
* Returns a map of the query parameters of the request.
*
* @return a unmodifiable map of query parameters of the request.
*/
@Override @Override
public Map<String, List<String>> getParameterMap() public Map<String, List<String>> getParameterMap()
{ {
return Collections.unmodifiableMap(parameters); return Collections.emptyMap();
} }
@Override @Override
@ -297,6 +292,25 @@ public final class ClientUpgradeRequest implements UpgradeRequest
throw new UnsupportedOperationException("HttpSession not available on Client request"); throw new UnsupportedOperationException("HttpSession not available on Client request");
} }
/**
* @param timeout the total timeout for the request/response conversation of the WebSocket handshake;
* use zero or a negative value to disable the timeout
* @param unit the timeout unit
*/
public void setTimeout(long timeout, TimeUnit unit)
{
this.timeout = unit.toMillis(timeout);
}
/**
* @return the total timeout for this request, in milliseconds;
* zero or negative if the timeout is disabled
*/
public long getTimeout()
{
return timeout;
}
/** /**
* ABNF from RFC 2616, RFC 822, and RFC 6455 specified characters requiring quoting. * ABNF from RFC 2616, RFC 822, and RFC 6455 specified characters requiring quoting.
*/ */

View File

@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
@ -41,7 +42,6 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ShutdownThread; import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketContainer; import org.eclipse.jetty.websocket.api.WebSocketContainer;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -109,7 +109,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
* @return the future for the session, available on success of connect * @return the future for the session, available on success of connect
* @throws IOException if unable to connect * @throws IOException if unable to connect
*/ */
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request) throws IOException public CompletableFuture<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
{ {
return connect(websocket, toUri, request, null); return connect(websocket, toUri, request, null);
} }
@ -124,7 +124,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
* @return the future for the session, available on success of connect * @return the future for the session, available on success of connect
* @throws IOException if unable to connect * @throws IOException if unable to connect
*/ */
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request, JettyUpgradeListener upgradeListener) throws IOException public CompletableFuture<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, JettyUpgradeListener upgradeListener) throws IOException
{ {
for (Connection.Listener listener : getBeans(Connection.Listener.class)) for (Connection.Listener listener : getBeans(Connection.Listener.class))
{ {
@ -132,6 +132,8 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
} }
JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(coreClient, request, toUri, frameHandlerFactory, websocket); JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(coreClient, request, toUri, frameHandlerFactory, websocket);
upgradeRequest.timeout(request.getTimeout(), TimeUnit.MILLISECONDS);
upgradeRequest.setConfiguration(configurationCustomizer);
if (upgradeListener != null) if (upgradeListener != null)
{ {
upgradeRequest.addListener(new UpgradeListener() upgradeRequest.addListener(new UpgradeListener()
@ -149,9 +151,8 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
} }
}); });
} }
upgradeRequest.setConfiguration(configurationCustomizer);
CompletableFuture<Session> futureSession = new CompletableFuture<>();
CompletableFuture<Session> futureSession = new CompletableFuture<>();
coreClient.connect(upgradeRequest).whenComplete((coreSession, error) -> coreClient.connect(upgradeRequest).whenComplete((coreSession, error) ->
{ {
if (error != null) if (error != null)

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.JettyUpgradeListener; import org.eclipse.jetty.websocket.client.JettyUpgradeListener;
@ -104,7 +103,7 @@ public class JettyWebSocketExtensionConfigTest
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/filterPath"); URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/filterPath");
EventSocket socket = new EventSocket(); EventSocket socket = new EventSocket();
UpgradeRequest request = new ClientUpgradeRequest(); ClientUpgradeRequest request = new ClientUpgradeRequest();
request.addExtensions(ExtensionConfig.parse("permessage-deflate")); request.addExtensions(ExtensionConfig.parse("permessage-deflate"));
CountDownLatch correctResponseExtensions = new CountDownLatch(1); CountDownLatch correctResponseExtensions = new CountDownLatch(1);

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer; import org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer;
@ -81,7 +80,7 @@ public class JettyWebSocketNegotiationTest
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/filterPath"); URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/filterPath");
EventSocket socket = new EventSocket(); EventSocket socket = new EventSocket();
UpgradeRequest upgradeRequest = new ClientUpgradeRequest(); ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.addExtensions("permessage-deflate;invalidParameter"); upgradeRequest.addExtensions("permessage-deflate;invalidParameter");
CompletableFuture<Session> connect = client.connect(socket, uri, upgradeRequest); CompletableFuture<Session> connect = client.connect(socket, uri, upgradeRequest);

View File

@ -36,7 +36,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.security.Constraint; import org.eclipse.jetty.util.security.Constraint;
import org.eclipse.jetty.util.security.Credential; import org.eclipse.jetty.util.security.Credential;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
@ -139,7 +138,7 @@ public class WebSocketServletExamplesTest
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/advancedEcho"); URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/advancedEcho");
EventSocket socket = new EventSocket(); EventSocket socket = new EventSocket();
UpgradeRequest upgradeRequest = new ClientUpgradeRequest(); ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("text"); upgradeRequest.setSubProtocols("text");
CompletableFuture<Session> connect = client.connect(socket, uri, upgradeRequest); CompletableFuture<Session> connect = client.connect(socket, uri, upgradeRequest);
try (Session session = connect.get(5, TimeUnit.SECONDS)) try (Session session = connect.get(5, TimeUnit.SECONDS))

View File

@ -38,7 +38,6 @@ import org.eclipse.jetty.websocket.tests.EchoSocket;
import org.eclipse.jetty.websocket.tests.EventSocket; import org.eclipse.jetty.websocket.tests.EventSocket;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -109,14 +108,13 @@ public class ClientTimeoutTest
assertThat(coreUpgradeException.getCause(), instanceOf(TimeoutException.class)); assertThat(coreUpgradeException.getCause(), instanceOf(TimeoutException.class));
} }
@Disabled("need the client timeout to be ported from 9.4 to 10")
@Test @Test
public void testClientUpgradeRequestTimeout() throws Exception public void testClientUpgradeRequestTimeout() throws Exception
{ {
EventSocket clientSocket = new EventSocket(); EventSocket clientSocket = new EventSocket();
long timeout = 1000; long timeout = 1000;
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
// TODO: upgradeRequest.setTimeout(timeout, TimeUnit.MILLISECONDS); upgradeRequest.setTimeout(timeout, TimeUnit.MILLISECONDS);
Future<Session> connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI()), upgradeRequest); Future<Session> connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI()), upgradeRequest);
ExecutionException executionException = assertThrows(ExecutionException.class, () -> connect.get(timeout * 2, TimeUnit.MILLISECONDS)); ExecutionException executionException = assertThrows(ExecutionException.class, () -> connect.get(timeout * 2, TimeUnit.MILLISECONDS));