Issue #3537 - Bootstrapping WebSockets with HTTP/2.

Implemented upgrade logic for WebSocket over HTTP/2.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-08-14 14:57:21 +02:00
parent a700907522
commit 01af85db42
35 changed files with 870 additions and 363 deletions

View File

@ -87,6 +87,7 @@ public class HttpRequest implements Request
private List<RequestListener> requestListeners;
private BiFunction<Request, Request, Response.CompleteListener> pushListener;
private Supplier<HttpFields> trailers;
private String upgradeProtocol;
protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
{
@ -609,6 +610,12 @@ public class HttpRequest implements Request
return this;
}
public HttpRequest upgradeProtocol(String upgradeProtocol)
{
this.upgradeProtocol = upgradeProtocol;
return this;
}
@Override
public ContentProvider getContent()
{
@ -765,6 +772,11 @@ public class HttpRequest implements Request
return trailers;
}
public String getUpgradeProtocol()
{
return upgradeProtocol;
}
@Override
public boolean abort(Throwable cause)
{

View File

@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
public interface HttpUpgrader
{
public void prepare(HttpRequest request);
public void upgrade(HttpResponse response, EndPoint endPoint);
public interface Factory
{
public HttpUpgrader newHttpUpgrader(HttpVersion version);
}
}

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
@ -98,29 +99,24 @@ public class HttpChannelOverHTTP extends HttpChannel
return result;
HttpResponse response = exchange.getResponse();
if ((response.getVersion() == HttpVersion.HTTP_1_1) &&
(response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101))
if (response.getVersion() == HttpVersion.HTTP_1_1 && response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
String nextConnection = response.getHeaders().get(HttpHeader.CONNECTION);
if ((nextConnection == null) || !nextConnection.toLowerCase(Locale.US).contains("upgrade"))
{
return new Result(result, new HttpResponseException("101 Switching Protocols without Connection: Upgrade not supported", response));
}
String header = response.getHeaders().get(HttpHeader.CONNECTION);
if (header == null || !header.toLowerCase(Locale.US).contains("upgrade"))
return new Result(result, new HttpResponseException("101 response without 'Connection: Upgrade'", response));
// Upgrade Response
HttpRequest request = exchange.getRequest();
HttpConnectionUpgrader upgrader = (HttpConnectionUpgrader)request.getConversation().getAttribute(HttpConnectionUpgrader.class.getName());
if (upgrader != null)
{
HttpUpgrader upgrader = (HttpUpgrader)request.getConversation().getAttribute(HttpUpgrader.class.getName());
if (upgrader == null)
return new Result(result, new HttpResponseException("101 response without " + HttpUpgrader.class.getSimpleName(), response));
try
{
upgrader.upgrade(response, getHttpConnection());
upgrader.upgrade(response, getHttpConnection().getEndPoint());
}
catch (Throwable x)
{
return new Result(result, x);
}
return new Result(result, new HttpResponseException("Could not upgrade to WebSocket", response, x));
}
}

View File

@ -30,11 +30,14 @@ import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.IConnection;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
@ -268,6 +271,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
request.timeout(connectTimeout, TimeUnit.MILLISECONDS)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS);
}
if (request instanceof HttpUpgrader.Factory)
{
HttpUpgrader upgrader = ((HttpUpgrader.Factory)request).newHttpUpgrader(HttpVersion.HTTP_1_1);
((HttpRequest)request).getConversation().setAttribute(HttpUpgrader.class.getName(), upgrader);
upgrader.prepare((HttpRequest)request);
}
}
@Override

View File

@ -333,8 +333,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
status == HttpStatus.OK_200)
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && status == HttpStatus.OK_200)
return true;
return false;

View File

@ -250,6 +250,11 @@ public class MetaData implements Iterable<HttpField>
private String _protocol;
public ConnectRequest(HttpScheme scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
this(scheme == null ? null : scheme.asString(), authority, path, fields, protocol);
}
public ConnectRequest(String scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
super(HttpMethod.CONNECT.asString(), scheme, authority, path, HttpVersion.HTTP_2, fields, Long.MIN_VALUE);
_protocol = protocol;

View File

@ -32,7 +32,9 @@ import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
@ -88,6 +90,18 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
return send(channel, exchange);
}
@Override
protected void normalizeRequest(Request request)
{
super.normalizeRequest(request);
if (request instanceof HttpUpgrader.Factory)
{
HttpUpgrader upgrader = ((HttpUpgrader.Factory)request).newHttpUpgrader(HttpVersion.HTTP_2);
((HttpRequest)request).getConversation().setAttribute(HttpUpgrader.class.getName(), upgrader);
upgrader.prepare((HttpRequest)request);
}
}
protected HttpChannelOverHTTP2 acquireHttpChannel()
{
HttpChannelOverHTTP2 channel = idleChannels.poll();

View File

@ -27,10 +27,12 @@ import java.util.Queue;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
@ -106,7 +108,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
if (LOG.isDebugEnabled())
LOG.debug("Successful HTTP2 tunnel on {} via {}", stream, endPoint);
((IStream)stream).setAttachment(endPoint);
httpRequest.getConversation().setAttribute(EndPoint.class.getName(), endPoint);
HttpConversation conversation = httpRequest.getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
HttpUpgrader upgrader = (HttpUpgrader)conversation.getAttribute(HttpUpgrader.class.getName());
if (upgrader != null)
upgrader.upgrade(httpResponse, endPoint);
}
if (responseHeaders(exchange))

View File

@ -59,7 +59,16 @@ public class HttpSenderOverHTTP2 extends HttpSender
MetaData.Request metaData;
if (isTunnel)
{
metaData = new MetaData.Request(request.getMethod(), null, new HostPortHttpField(request.getPath()), null, HttpVersion.HTTP_2, request.getHeaders());
String upgradeProtocol = request.getUpgradeProtocol();
if (upgradeProtocol == null)
{
metaData = new MetaData.ConnectRequest((String)null, new HostPortHttpField(request.getPath()), null, request.getHeaders(), null);
}
else
{
HostPortHttpField authority = new HostPortHttpField(request.getHost(), request.getPort());
metaData = new MetaData.ConnectRequest(request.getScheme(), authority, request.getPath(), request.getHeaders(), upgradeProtocol);
}
}
else
{

View File

@ -340,6 +340,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
Object attachment = stream.getAttachment();
if (attachment instanceof HttpChannelOverHTTP2)
{
// TODO: we used to "fake" a 101 response to upgrade the endpoint
// but we don't anymore, so this code should be deleted.
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)attachment;
if (channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{

View File

@ -407,6 +407,12 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
return true;
}
@Override
protected void checkAndPrepareUpgrade()
{
// TODO: move the code from HttpConnection.upgrade() here?
}
/**
* <p>Attempts to perform a HTTP/1.1 upgrade.</p>
* <p>The upgrade looks up a {@link ConnectionFactory.Upgrading} from the connector

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.javax.client;
import java.net.URI;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
@ -43,11 +43,11 @@ public class JavaxClientUpgradeRequest extends ClientUpgradeRequest
}
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public void upgrade(HttpResponse response, EndPoint endPoint)
{
frameHandler.setUpgradeRequest(new DelegatedJavaxClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJavaxClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
super.upgrade(response, endPoint);
}
@Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.javax.tests;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -53,7 +54,7 @@ public class LocalFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
public LocalFuzzer(Provider provider, CharSequence requestPath) throws Exception
{
this(provider, requestPath, UpgradeUtils.newDefaultUpgradeRequestHeaders());
this(provider, requestPath, new HashMap<>());
}
public LocalFuzzer(Provider provider, CharSequence requestPath, Map<String, String> headers) throws Exception

View File

@ -208,9 +208,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
}
@Override
protected void customize(EndPoint endp)
protected void customize(EndPoint endPoint)
{
frameCapture.setEndPoint(endp);
frameCapture.setEndPoint(endPoint);
futureCapture.complete(frameCapture);
}

View File

@ -19,9 +19,6 @@
package org.eclipse.jetty.websocket.javax.tests;
import java.util.Map;
import java.util.TreeMap;
import org.eclipse.jetty.http.HttpHeader;
public class UpgradeUtils
{
@ -31,32 +28,9 @@ public class UpgradeUtils
upgradeRequest.append("GET ");
upgradeRequest.append(requestPath == null ? "/" : requestPath);
upgradeRequest.append(" HTTP/1.1\r\n");
headers.entrySet().stream().forEach(e ->
headers.entrySet().forEach(e ->
upgradeRequest.append(e.getKey()).append(": ").append(e.getValue()).append("\r\n"));
upgradeRequest.append("\r\n");
return upgradeRequest.toString();
}
public static String generateUpgradeRequest()
{
return generateUpgradeRequest("/", newDefaultUpgradeRequestHeaders());
}
public static String generateUpgradeRequest(CharSequence requestPath)
{
return generateUpgradeRequest(requestPath, newDefaultUpgradeRequestHeaders());
}
public static Map<String, String> newDefaultUpgradeRequestHeaders()
{
Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
headers.put("Host", "local");
headers.put("Connection", "Upgrade");
headers.put("Upgrade", "WebSocket");
headers.put(HttpHeader.SEC_WEBSOCKET_KEY.asString(), "dGhlIHNhbXBsZSBub25jZQ==");
headers.put(HttpHeader.ORIGIN.asString(), "ws://local/");
// headers.put(WSConstants.SEC_WEBSOCKET_PROTOCOL, "echo");
headers.put(HttpHeader.SEC_WEBSOCKET_VERSION.asString(), "13");
return headers;
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.javax.tests.server;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -30,7 +31,6 @@ import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.tests.Fuzzer;
import org.eclipse.jetty.websocket.javax.tests.UpgradeUtils;
import org.eclipse.jetty.websocket.javax.tests.WSServer;
import org.eclipse.jetty.websocket.javax.tests.coders.DateDecoder;
import org.eclipse.jetty.websocket.javax.tests.coders.TimeEncoder;
@ -72,8 +72,8 @@ public class AnnotatedServerEndpointTest
private void assertResponse(String message, String expectedText) throws Exception
{
Map<String, String> upgradeRequest = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeRequest.put(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), "echo");
Map<String, String> headers = new HashMap<>();
headers.put(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), "echo");
List<Frame> send = new ArrayList<>();
send.add(new Frame(OpCode.TEXT).setPayload(message));
@ -83,7 +83,7 @@ public class AnnotatedServerEndpointTest
expect.add(new Frame(OpCode.TEXT).setPayload(expectedText));
expect.add(CloseStatus.toFrame(CloseStatus.NORMAL));
try (Fuzzer session = server.newNetworkFuzzer("/app/echo", upgradeRequest))
try (Fuzzer session = server.newNetworkFuzzer("/app/echo", headers))
{
session.sendFrames(send);
session.expect(expect);

View File

@ -19,7 +19,7 @@
#
# org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
org.eclipse.jetty.LEVEL=INFO
# org.eclipse.jetty.util.log.stderr.LONG=true
# org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG

View File

@ -255,34 +255,6 @@ public interface UpgradeRequest
*/
void setHeaders(Map<String, List<String>> headers);
/**
* Set the HTTP Version to use.
* <p>
* As of <a href="http://tools.ietf.org/html/rfc6455">RFC6455 (December 2011)</a> this should always be
* {@code HTTP/1.1}
*
* @param httpVersion the HTTP version to use.
*/
void setHttpVersion(String httpVersion);
/**
* Set the HTTP method to use.
* <p>
* As of <a href="http://tools.ietf.org/html/rfc6455">RFC6455 (December 2011)</a> this is always {@code GET}
*
* @param method the HTTP method to use.
*/
void setMethod(String method);
/**
* Set the Request URI to use for this request.
* <p>
* Must be an absolute URI with scheme {@code 'ws'} or {@code 'wss'}
*
* @param uri the Request URI
*/
void setRequestURI(URI uri);
/**
* Set the Session associated with this request.
* <p>

View File

@ -26,9 +26,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -46,7 +48,6 @@ public final class ClientUpgradeRequest implements UpgradeRequest
private String httpVersion;
private String method;
private String host;
private boolean secure;
public ClientUpgradeRequest()
{
@ -57,12 +58,9 @@ public final class ClientUpgradeRequest implements UpgradeRequest
{
this.requestURI = uri;
String scheme = uri.getScheme();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
{
if (!HttpScheme.WS.is(scheme) || !HttpScheme.WSS.is(scheme))
throw new IllegalArgumentException("URI scheme must be 'ws' or 'wss'");
}
this.host = this.requestURI.getHost();
this.parameters.clear();
}
@Override
@ -193,11 +191,7 @@ public final class ClientUpgradeRequest implements UpgradeRequest
public String getProtocolVersion()
{
String version = getHeader("Sec-WebSocket-Version");
if (version == null)
{
return "13"; // Default
}
return version;
return Objects.requireNonNullElse(version, "13");
}
@Override
@ -288,8 +282,7 @@ public final class ClientUpgradeRequest implements UpgradeRequest
@Override
public void setHeaders(Map<String, List<String>> headers)
{
headers.clear();
this.headers.clear();
for (Map.Entry<String, List<String>> entry : headers.entrySet())
{
String name = entry.getKey();
@ -298,24 +291,6 @@ public final class ClientUpgradeRequest implements UpgradeRequest
}
}
@Override
public void setHttpVersion(String httpVersion)
{
this.httpVersion = httpVersion;
}
@Override
public void setMethod(String method)
{
this.method = method;
}
@Override
public void setRequestURI(URI uri)
{
this.requestURI = uri;
}
@Override
public void setSession(Object session)
{

View File

@ -231,24 +231,6 @@ public class DelegatedJettyClientUpgradeRequest implements UpgradeRequest
// TODO
}
@Override
public void setHttpVersion(String httpVersion)
{
// TODO
}
@Override
public void setMethod(String method)
{
}
@Override
public void setRequestURI(URI uri)
{
// TODO
}
@Override
public void setSession(Object session)
{

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
@ -89,18 +88,18 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
}
@Override
protected void customize(EndPoint endp)
protected void customize(EndPoint endPoint)
{
super.customize(endp);
handshakeRequest.configure(endp);
super.customize(endPoint);
handshakeRequest.configure(endPoint);
}
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public void upgrade(HttpResponse response, EndPoint endPoint)
{
frameHandler.setUpgradeRequest(new DelegatedJettyClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJettyClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
super.upgrade(response, endPoint);
}
@Override

View File

@ -185,24 +185,6 @@ public class DummyUpgradeRequest implements UpgradeRequest
}
@Override
public void setHttpVersion(String httpVersion)
{
}
@Override
public void setMethod(String method)
{
}
@Override
public void setRequestURI(URI uri)
{
}
@Override
public void setSession(Object session)
{

View File

@ -197,24 +197,6 @@ public class UpgradeRequestAdapter implements UpgradeRequest
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setHttpVersion(String httpVersion)
{
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setMethod(String method)
{
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setRequestURI(URI uri)
{
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setSession(Object session)
{

View File

@ -19,26 +19,37 @@
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-api</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.tests</groupId>
<artifactId>jetty-http-tools</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>compile</scope>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -57,14 +57,14 @@ public class EventSocket
{
this.session = session;
behavior = session.getPolicy().getBehavior().name();
LOG.info("{} onOpen(): {}", toString(), session);
LOG.debug("{} onOpen(): {}", toString(), session);
openLatch.countDown();
}
@OnWebSocketMessage
public void onMessage(String message) throws IOException
{
LOG.info("{} onMessage(): {}", toString(), message);
LOG.debug("{} onMessage(): {}", toString(), message);
messageQueue.offer(message);
}
@ -72,14 +72,14 @@ public class EventSocket
public void onMessage(byte buf[], int offset, int len)
{
ByteBuffer message = ByteBuffer.wrap(buf, offset, len);
LOG.info("{} onMessage(): {}", toString(), message);
LOG.debug("{} onMessage(): {}", toString(), message);
binaryMessageQueue.offer(message);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
LOG.info("{} onClose(): {}:{}", toString(), statusCode, reason);
LOG.debug("{} onClose(): {}:{}", toString(), statusCode, reason);
this.statusCode = statusCode;
this.reason = reason;
closeLatch.countDown();
@ -88,7 +88,7 @@ public class EventSocket
@OnWebSocketError
public void onError(Throwable cause)
{
LOG.info("{} onError(): {}", toString(), cause);
LOG.debug("{} onError(): {}", toString(), cause);
error = cause;
errorLatch.countDown();
}

View File

@ -0,0 +1,131 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WebSocketOverHTTP2Test
{
private Server server;
private ServerConnector connector;
@BeforeEach
public void startServer() throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
HttpConfiguration httpConfiguration = new HttpConfiguration();
HttpConnectionFactory h1 = new HttpConnectionFactory(httpConfiguration);
HTTP2CServerConnectionFactory h2c = new HTTP2CServerConnectionFactory(httpConfiguration);
connector = new ServerConnector(server, 1, 1, h1, h2c);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/");
context.addServlet(new ServletHolder(new JettyWebSocketServlet()
{
@Override
protected void configure(JettyWebSocketServletFactory factory)
{
factory.addMapping("/ws/echo", (req, resp) -> new EchoSocket());
}
}), "/ws/*");
JettyWebSocketServletContainerInitializer.initialize(context);
server.start();
}
@AfterEach
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testWebSocketOverDynamicHTTP1() throws Exception
{
testWebSocketOverDynamicTransport(clientConnector -> HttpClientConnectionFactory.HTTP11);
}
@Test
public void testWebSocketOverDynamicHTTP2() throws Exception
{
testWebSocketOverDynamicTransport(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2C(new HTTP2Client(clientConnector)));
}
private void testWebSocketOverDynamicTransport(Function<ClientConnector, ClientConnectionFactory.Info> protocolFn) throws Exception
{
ClientConnector clientConnector = new ClientConnector();
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, protocolFn.apply(clientConnector)));
WebSocketClient wsClient = new WebSocketClient(httpClient);
wsClient.start();
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/echo");
Session session = wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS);
String text = "websocket";
session.getRemote().sendString(text);
String message = wsEndPoint.messageQueue.poll(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(text, message);
session.close(StatusCode.NORMAL, null);
assertTrue(wsEndPoint.closeLatch.await(5, TimeUnit.SECONDS));
assertEquals(StatusCode.NORMAL, wsEndPoint.statusCode);
assertNull(wsEndPoint.error);
}
}

View File

@ -1,25 +1,5 @@
#
#
# ========================================================================
# Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
# ------------------------------------------------------------------------
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Apache License v2.0 which accompanies this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# The Apache License v2.0 is available at
# http://www.opensource.org/licenses/apache2.0.php
#
# You may elect to redistribute this code under either of these licenses.
# ========================================================================
#
#
# org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
#org.eclipse.jetty.util.log.stderr.LONG=true
#org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG
@ -38,7 +18,7 @@ org.eclipse.jetty.LEVEL=WARN
#org.eclipse.jetty.websocket.tests.client.jsr356.LEVEL=DEBUG
#org.eclipse.jetty.websocket.tests.server.LEVEL=DEBUG
#org.eclipse.jetty.websocket.tests.server.jsr356.LEVEL=DEBUG
### Showing any unintended (ignored) errors from CompletionCallback
## Showing any unintended (ignored) errors from CompletionCallback
#org.eclipse.jetty.websocket.common.CompletionCallback.LEVEL=ALL
### Disabling intentional error out of RFCSocket
## Disabling intentional error out of RFCSocket
org.eclipse.jetty.websocket.tests.server.RFCSocket.LEVEL=OFF

View File

@ -20,12 +20,10 @@ package org.eclipse.jetty.websocket.core.client;
import java.net.URI;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -33,16 +31,13 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionUpgrader;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
@ -64,10 +59,9 @@ import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
public abstract class ClientUpgradeRequest extends HttpRequest implements Response.CompleteListener, HttpConnectionUpgrader
public abstract class ClientUpgradeRequest extends HttpRequest implements Response.CompleteListener, HttpUpgrader.Factory
{
public static ClientUpgradeRequest from(WebSocketCoreClient webSocketClient, URI requestURI, FrameHandler frameHandler)
{
@ -103,8 +97,8 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
throw new IllegalArgumentException("WebSocket URI must include a scheme");
}
String scheme = requestURI.getScheme().toLowerCase(Locale.ENGLISH);
if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
String scheme = requestURI.getScheme();
if (!HttpScheme.WS.is(scheme) && !HttpScheme.WSS.is(scheme))
{
throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
}
@ -116,10 +110,6 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
this.wsClient = webSocketClient;
this.futureCoreSession = new CompletableFuture<>();
method(HttpMethod.GET);
version(HttpVersion.HTTP_1_1);
getConversation().setAttribute(HttpConnectionUpgrader.class.getName(), this);
}
public void setConfiguration(FrameHandler.ConfigurationCustomizer config)
@ -152,12 +142,10 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
public List<ExtensionConfig> getExtensions()
{
List<ExtensionConfig> extensions = getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, true)
return getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, true)
.stream()
.map(ExtensionConfig::parse)
.collect(Collectors.toList());
return extensions;
}
public void setExtensions(List<ExtensionConfig> configs)
@ -172,8 +160,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
public List<String> getSubProtocols()
{
List<String> subProtocols = getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, true);
return subProtocols;
return getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, true);
}
public void setSubProtocols(String... protocols)
@ -198,18 +185,10 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
@Override
public void send(final Response.CompleteListener listener)
{
try
{
frameHandler = getFrameHandler();
if (frameHandler == null)
throw new IllegalArgumentException("FrameHandler could not be created");
}
catch (Throwable t)
{
throw new IllegalArgumentException("FrameHandler could not be created", t);
}
initWebSocketHeaders();
super.send(listener);
}
@ -279,20 +258,58 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
}
}
@SuppressWarnings("Duplicates")
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public HttpUpgrader newHttpUpgrader(HttpVersion version)
{
if (!this.getHeaders().get(HttpHeader.UPGRADE).equalsIgnoreCase("websocket"))
throw new HttpResponseException("Not a WebSocket Upgrade", response);
if (version == HttpVersion.HTTP_1_1)
return new HttpUpgraderOverHTTP(this);
else if (version == HttpVersion.HTTP_2)
return new HttpUpgraderOverHTTP2(this);
else
throw new UnsupportedOperationException("Unsupported HTTP version for upgrade: " + version);
}
// Check the Accept hash
String reqKey = this.getHeaders().get(HttpHeader.SEC_WEBSOCKET_KEY);
String expectedHash = WebSocketCore.hashKey(reqKey);
String respHash = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_ACCEPT);
if (expectedHash.equalsIgnoreCase(respHash) == false)
throw new HttpResponseException("Invalid Sec-WebSocket-Accept hash (was:" + respHash + ", expected:" + expectedHash + ")", response);
/**
* Allow for overridden customization of endpoint (such as special transport level properties: e.g. TCP keepAlive)
*/
protected void customize(EndPoint endPoint)
{
}
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated);
}
public abstract FrameHandler getFrameHandler();
private void initWebSocketHeaders()
{
notifyUpgradeListeners((listener) -> listener.onHandshakeRequest(this));
}
private void notifyUpgradeListeners(Consumer<UpgradeListener> action)
{
for (UpgradeListener listener : upgradeListeners)
{
try
{
action.accept(listener);
}
catch (Throwable t)
{
LOG.info("Exception while invoking listener " + listener, t);
}
}
}
public void upgrade(HttpResponse response, EndPoint endPoint)
{
// Parse the Negotiated Extensions
List<ExtensionConfig> negotiatedExtensions = new ArrayList<>();
HttpField extField = response.getHeaders().getField(HttpHeader.SEC_WEBSOCKET_EXTENSIONS);
@ -341,7 +358,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
if (values != null)
{
if (values.length > 1)
throw new WebSocketException("Upgrade failed: Too many WebSocket subprotocol's in response: " + values);
throw new WebSocketException("Upgrade failed: Too many WebSocket subprotocol's in response: " + Arrays.toString(values));
else if (values.length == 1)
negotiatedSubProtocol = values[0];
}
@ -355,8 +372,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
throw new WebSocketException("Upgrade failed: subprotocol [" + negotiatedSubProtocol + "] not found in offered subprotocols " + offeredSubProtocols);
// We can upgrade
EndPoint endp = httpConnection.getEndPoint();
customize(endp);
customize(endPoint);
Request request = response.getRequest();
Negotiated negotiated = new Negotiated(
@ -370,7 +386,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
customizer.customize(coreSession);
HttpClient httpClient = wsClient.getHttpClient();
WebSocketConnection wsConnection = newWebSocketConnection(endp, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), coreSession);
WebSocketConnection wsConnection = newWebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), coreSession);
for (Connection.Listener listener : wsClient.getBeans(Connection.Listener.class))
{
@ -384,7 +400,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
// Now swap out the connection
try
{
endp.upgrade(wsConnection);
endPoint.upgrade(wsConnection);
futureCoreSession.complete(coreSession);
}
catch (Throwable t)
@ -392,79 +408,4 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
futureCoreSession.completeExceptionally(t);
}
}
/**
* Allow for overridden customization of endpoint (such as special transport level properties: e.g. TCP keepAlive)
*
* @see <a href="https://github.com/eclipse/jetty.project/issues/1811">Issue #1811 - Customization of WebSocket Connections via WebSocketPolicy</a>
*/
protected void customize(EndPoint endp)
{
}
protected WebSocketConnection newWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endp, executor, scheduler, byteBufferPool, coreSession);
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated);
}
public abstract FrameHandler getFrameHandler();
private final String genRandomKey()
{
byte[] bytes = new byte[16];
ThreadLocalRandom.current().nextBytes(bytes);
return Base64.getEncoder().encodeToString(bytes);
}
private void initWebSocketHeaders()
{
method(HttpMethod.GET);
version(HttpVersion.HTTP_1_1);
// The Upgrade Headers
setHeaderIfNotPresent(HttpHeader.UPGRADE, "websocket");
setHeaderIfNotPresent(HttpHeader.CONNECTION, "Upgrade");
// The WebSocket Headers
setHeaderIfNotPresent(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey());
setHeaderIfNotPresent(HttpHeader.SEC_WEBSOCKET_VERSION, WebSocketConstants.SPEC_VERSION_STRING);
// (Per the hybi list): Add no-cache headers to avoid compatibility issue.
// There are some proxies that rewrite "Connection: upgrade"
// to "Connection: close" in the response if a request doesn't contain
// these headers.
setHeaderIfNotPresent(HttpHeader.PRAGMA, "no-cache");
setHeaderIfNotPresent(HttpHeader.CACHE_CONTROL, "no-cache");
// Notify upgrade hooks
notifyUpgradeListeners((listener) -> listener.onHandshakeRequest(this));
}
private void setHeaderIfNotPresent(HttpHeader header, String value)
{
if (!getHeaders().contains(header))
{
getHeaders().put(header, value);
}
}
private void notifyUpgradeListeners(Consumer<UpgradeListener> action)
{
for (UpgradeListener listener : upgradeListeners)
{
try
{
action.accept(listener);
}
catch (Throwable t)
{
LOG.warn("Unhandled error: " + t.getMessage(), t);
}
}
}
}

View File

@ -0,0 +1,86 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.client;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
public class HttpUpgraderOverHTTP implements HttpUpgrader
{
private final ClientUpgradeRequest clientUpgradeRequest;
public HttpUpgraderOverHTTP(ClientUpgradeRequest clientUpgradeRequest)
{
this.clientUpgradeRequest = clientUpgradeRequest;
}
@Override
public void prepare(HttpRequest request)
{
request.method(HttpMethod.GET).version(HttpVersion.HTTP_1_1);
request.header(HttpHeader.SEC_WEBSOCKET_VERSION, WebSocketConstants.SPEC_VERSION_STRING);
request.header(HttpHeader.UPGRADE, "websocket");
request.header(HttpHeader.CONNECTION, "Upgrade");
request.header(HttpHeader.SEC_WEBSOCKET_KEY, generateRandomKey());
// Per the hybi list: Add no-cache headers to avoid compatibility issue.
// There are some proxies that rewrite "Connection: upgrade" to
// "Connection: close" in the response if a request doesn't contain
// these headers.
request.header(HttpHeader.PRAGMA, "no-cache");
request.header(HttpHeader.CACHE_CONTROL, "no-cache");
}
private String generateRandomKey()
{
byte[] bytes = new byte[16];
ThreadLocalRandom.current().nextBytes(bytes);
return new String(Base64.getEncoder().encode(bytes), StandardCharsets.US_ASCII);
}
@Override
public void upgrade(HttpResponse response, EndPoint endPoint)
{
HttpRequest request = (HttpRequest)response.getRequest();
HttpFields requestHeaders = request.getHeaders();
if (!requestHeaders.get(HttpHeader.UPGRADE).equalsIgnoreCase("websocket"))
throw new HttpResponseException("Not a WebSocket Upgrade", response);
// Check the Accept hash
String reqKey = requestHeaders.get(HttpHeader.SEC_WEBSOCKET_KEY);
String expectedHash = WebSocketCore.hashKey(reqKey);
String respHash = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_ACCEPT);
if (!expectedHash.equalsIgnoreCase(respHash))
throw new HttpResponseException("Invalid Sec-WebSocket-Accept hash (was:" + respHash + ", expected:" + expectedHash + ")", response);
clientUpgradeRequest.upgrade(response, endPoint);
}
}

View File

@ -0,0 +1,51 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.client;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
public class HttpUpgraderOverHTTP2 implements HttpUpgrader
{
private final ClientUpgradeRequest clientUpgradeRequest;
public HttpUpgraderOverHTTP2(ClientUpgradeRequest clientUpgradeRequest)
{
this.clientUpgradeRequest = clientUpgradeRequest;
}
@Override
public void prepare(HttpRequest request)
{
request.method(HttpMethod.CONNECT);
request.upgradeProtocol("websocket");
request.header(HttpHeader.SEC_WEBSOCKET_VERSION, WebSocketConstants.SPEC_VERSION_STRING);
}
@Override
public void upgrade(HttpResponse response, EndPoint endPoint)
{
clientUpgradeRequest.upgrade(response, endPoint);
}
}

View File

@ -23,19 +23,16 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.server.internal.HandshakerSelector;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.eclipse.jetty.websocket.core.server.internal.RFC8441Handshaker;
public interface Handshaker
{
static Handshaker newInstance()
{
return new RFC6455Handshaker();
return new HandshakerSelector(new RFC6455Handshaker(), new RFC8441Handshaker());
}
boolean upgradeRequest(
WebSocketNegotiator negotiator,
HttpServletRequest request,
HttpServletResponse response,
FrameHandler.Customizer defaultCustomizer)
throws IOException;
boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException;
}

View File

@ -0,0 +1,58 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public class HandshakerSelector implements Handshaker
{
private List<Handshaker> handshakers = new ArrayList<>();
public HandshakerSelector(Handshaker... handshakers)
{
Collections.addAll(this.handshakers, handshakers);
}
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
{
// TODO: we don't want to do a lot of work for every request that is not websocket.
// Something like: if (method == CONNECT) only try 8441, else if (method == GET) only try 6455.
// TODO: optimise (do pre checks and avoid iterating through handshakers)
// TODO: minimum simplest thing to do to return false
for (Handshaker handshaker : handshakers)
{
if (handshaker.upgradeRequest(negotiator, request, response, defaultCustomizer))
return true;
if (response.isCommitted())
return false;
}
return false;
}
}

View File

@ -0,0 +1,249 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public class RFC8441Handshaker implements Handshaker
{
static final Logger LOG = Log.getLogger(RFC8441Handshaker.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
{
Request baseRequest = Request.getBaseRequest(request);
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
if (!HttpVersion.HTTP_2.equals(baseRequest.getHttpVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded HttpVersion!=2 {}", baseRequest);
return false;
}
if (!HttpMethod.CONNECT.is(request.getMethod()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded method!=GET {}", baseRequest);
return false;
}
if (negotiator == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no WebSocketNegotiator {}", baseRequest);
return false;
}
ByteBufferPool pool = negotiator.getByteBufferPool();
if (pool == null)
pool = baseRequest.getHttpChannel().getConnector().getByteBufferPool();
Negotiation negotiation = new RFC8441Negotiation(baseRequest, request, response, new WebSocketComponents());
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
if (!negotiation.isUpgrade())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no upgrade header or connection upgrade", baseRequest);
return false;
}
if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), baseRequest);
return false;
}
// Negotiate the FrameHandler
FrameHandler handler = negotiator.negotiate(negotiation);
if (LOG.isDebugEnabled())
LOG.debug("negotiated handler {}", handler);
// Handle error responses
if (response.isCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: response committed {}", baseRequest);
baseRequest.setHandled(true);
return false;
}
if (response.getStatus() > 200)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: error sent {} {}", response.getStatus(), baseRequest);
response.flushBuffer();
baseRequest.setHandled(true);
return false;
}
// Check for handler
if (handler == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided {}", baseRequest);
return false;
}
// validate negotiated subprotocol
String subprotocol = negotiation.getSubprotocol();
if (subprotocol != null)
{
if (!negotiation.getOfferedSubprotocols().contains(subprotocol))
throw new WebSocketException("not upgraded: selected a subprotocol not present in offered subprotocols");
}
else
{
if (!negotiation.getOfferedSubprotocols().isEmpty())
throw new WebSocketException("not upgraded: no subprotocol selected from offered subprotocols");
}
// validate negotiated extensions
for (ExtensionConfig config : negotiation.getNegotiatedExtensions())
{
if (config.getName().startsWith("@"))
continue;
long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches < 1)
throw new WebSocketException("Upgrade failed: negotiated extension not requested");
matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches > 1)
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
Negotiated negotiated = new Negotiated(
baseRequest.getHttpURI().toURI(),
subprotocol,
baseRequest.isSecure(),
extensionStack,
WebSocketConstants.SPEC_VERSION_STRING);
// Create the Channel
WebSocketCoreSession coreSession = newWebSocketCoreSession(handler, negotiated);
if (defaultCustomizer != null)
defaultCustomizer.customize(coreSession);
negotiator.customize(coreSession);
if (LOG.isDebugEnabled())
LOG.debug("coreSession {}", coreSession);
// Create a connection
EndPoint endPoint = baseRequest.getHttpChannel().getTunnellingEndPoint();
WebSocketConnection connection = newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
throw new WebSocketException("not upgraded: no connection");
for (Connection.Listener listener : connector.getBeans(Connection.Listener.class))
{
connection.addListener(listener);
}
coreSession.setWebSocketConnection(connection);
// send upgrade response
Response baseResponse = baseRequest.getResponse();
baseResponse.setStatus(HttpStatus.OK_200);
// See bugs.eclipse.org/485969
if (getSendServerVersion(connector))
baseResponse.getHttpFields().put(SERVER_VERSION);
baseRequest.setHandled(true);
// upgrade
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={}", connection, coreSession);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
return true;
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.SERVER, negotiated);
}
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
private boolean getSendServerVersion(Connector connector)
{
ConnectionFactory connFactory = connector.getConnectionFactory(HttpVersion.HTTP_2.asString());
if (connFactory == null)
return false;
if (connFactory instanceof HttpConnectionFactory)
{
HttpConfiguration httpConf = ((HttpConnectionFactory)connFactory).getHttpConfiguration();
if (httpConf != null)
return httpConf.getSendServerVersion();
}
return false;
}
}

View File

@ -0,0 +1,44 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.server.Negotiation;
public class RFC8441Negotiation extends Negotiation
{
public RFC8441Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketComponents components) throws BadMessageException
{
super(baseRequest, request, response, components);
}
@Override
public boolean isUpgrade()
{
if (!getBaseRequest().hasMetaData())
return false;
return "websocket".equals(getBaseRequest().getMetaData().getProtocol());
}
}