Merge pull request #3740 from eclipse/jetty-10.0.x-3537-bootstrap_websocket_http2

Issue #3537 - Bootstrap websocket on HTTP/2
This commit is contained in:
Simone Bordet 2019-11-28 15:47:26 +01:00 committed by GitHub
commit 3b817821e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 1463 additions and 677 deletions

View File

@ -1101,7 +1101,7 @@ public class HttpClient extends ContainerLifeCycle
return port == 80;
}
static boolean isSchemeSecure(String scheme)
public static boolean isSchemeSecure(String scheme)
{
return HttpScheme.HTTPS.is(scheme) || HttpScheme.WSS.is(scheme);
}

View File

@ -88,6 +88,7 @@ public class HttpRequest implements Request
private List<RequestListener> requestListeners;
private BiFunction<Request, Request, Response.CompleteListener> pushListener;
private Supplier<HttpFields> trailers;
private String upgradeProtocol;
protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
{
@ -635,6 +636,12 @@ public class HttpRequest implements Request
return this;
}
public HttpRequest upgradeProtocol(String upgradeProtocol)
{
this.upgradeProtocol = upgradeProtocol;
return this;
}
@Override
public ContentProvider getContent()
{
@ -791,6 +798,11 @@ public class HttpRequest implements Request
return trailers;
}
public String getUpgradeProtocol()
{
return upgradeProtocol;
}
@Override
public boolean abort(Throwable cause)
{

View File

@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
public interface HttpUpgrader
{
public void prepare(HttpRequest request);
public void upgrade(HttpResponse response, EndPoint endPoint);
public interface Factory
{
public HttpUpgrader newHttpUpgrader(HttpVersion version);
}
}

View File

@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import org.eclipse.jetty.alpn.client.ALPNClientConnection;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpRequest;
@ -36,7 +37,6 @@ import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
@ -121,7 +121,7 @@ public class HttpClientTransportDynamic extends AbstractConnectorHttpClientTrans
@Override
public HttpDestination.Key newDestinationKey(HttpRequest request, Origin origin)
{
boolean ssl = HttpScheme.HTTPS.is(request.getScheme());
boolean ssl = HttpClient.isSchemeSecure(request.getScheme());
String http1 = "http/1.1";
String http2 = ssl ? "h2" : "h2c";
List<String> protocols = List.of();

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
@ -98,29 +99,24 @@ public class HttpChannelOverHTTP extends HttpChannel
return result;
HttpResponse response = exchange.getResponse();
if ((response.getVersion() == HttpVersion.HTTP_1_1) &&
(response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101))
if (response.getVersion() == HttpVersion.HTTP_1_1 && response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
String nextConnection = response.getHeaders().get(HttpHeader.CONNECTION);
if ((nextConnection == null) || !nextConnection.toLowerCase(Locale.US).contains("upgrade"))
{
return new Result(result, new HttpResponseException("101 Switching Protocols without Connection: Upgrade not supported", response));
}
String header = response.getHeaders().get(HttpHeader.CONNECTION);
if (header == null || !header.toLowerCase(Locale.US).contains("upgrade"))
return new Result(result, new HttpResponseException("101 response without 'Connection: Upgrade'", response));
// Upgrade Response
HttpRequest request = exchange.getRequest();
HttpConnectionUpgrader upgrader = (HttpConnectionUpgrader)request.getConversation().getAttribute(HttpConnectionUpgrader.class.getName());
if (upgrader != null)
HttpUpgrader upgrader = (HttpUpgrader)request.getConversation().getAttribute(HttpUpgrader.class.getName());
if (upgrader == null)
return new Result(result, new HttpResponseException("101 response without " + HttpUpgrader.class.getSimpleName(), response));
try
{
try
{
upgrader.upgrade(response, getHttpConnection());
}
catch (Throwable x)
{
return new Result(result, x);
}
upgrader.upgrade(response, getHttpConnection().getEndPoint());
}
catch (Throwable x)
{
return new Result(result, new HttpResponseException("Could not upgrade to WebSocket", response, x));
}
}

View File

@ -30,11 +30,14 @@ import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.IConnection;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
@ -258,6 +261,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
request.timeout(connectTimeout, TimeUnit.MILLISECONDS)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS);
}
if (request instanceof HttpUpgrader.Factory)
{
HttpUpgrader upgrader = ((HttpUpgrader.Factory)request).newHttpUpgrader(HttpVersion.HTTP_1_1);
((HttpRequest)request).getConversation().setAttribute(HttpUpgrader.class.getName(), upgrader);
upgrader.prepare((HttpRequest)request);
}
}
@Override

View File

@ -338,8 +338,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
status == HttpStatus.OK_200)
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && status == HttpStatus.OK_200)
return true;
return false;

View File

@ -250,6 +250,11 @@ public class MetaData implements Iterable<HttpField>
private String _protocol;
public ConnectRequest(HttpScheme scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
this(scheme == null ? null : scheme.asString(), authority, path, fields, protocol);
}
public ConnectRequest(String scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
super(HttpMethod.CONNECT.asString(), scheme, authority, path, HttpVersion.HTTP_2, fields, Long.MIN_VALUE);
_protocol = protocol;

View File

@ -97,6 +97,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private int initialSessionRecvWindow;
private int writeThreshold;
private boolean pushEnabled;
private boolean connectProtocolEnabled;
private long idleTime;
private GoAwayFrame closeFrame;
@ -370,6 +371,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
generator.setMaxHeaderListSize(value);
break;
}
case SettingsFrame.ENABLE_CONNECT_PROTOCOL:
{
boolean enabled = value == 1;
if (LOG.isDebugEnabled())
LOG.debug("{} CONNECT protocol for {}", enabled ? "Enabling" : "Disabling", this);
connectProtocolEnabled = enabled;
break;
}
default:
{
if (LOG.isDebugEnabled())
@ -906,6 +915,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return pushEnabled;
}
@ManagedAttribute(value = "Whether CONNECT requests supports a protocol", readonly = true)
public boolean isConnectProtocolEnabled()
{
return connectProtocolEnabled;
}
public void setConnectProtocolEnabled(boolean connectProtocolEnabled)
{
this.connectProtocolEnabled = connectProtocolEnabled;
}
/**
* A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
* This method is invoked when the TCP FIN is received, or when an exception is

View File

@ -372,16 +372,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
dataProcess = proceed = dataDemand > 0;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this);
if (initial)
{
if (LOG.isDebugEnabled())
LOG.debug("Starting data processing of {} for {}", frame, this);
notifyBeforeData(this);
try (AutoLock l = lock.lock())
{
dataProcess = proceed = dataDemand > 0;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} data processing of {} for {}", proceed ? "Proceeding" : "Stalling", frame, this);
if (proceed)
processData();
}

View File

@ -30,6 +30,7 @@ public class SettingsFrame extends Frame
public static final int INITIAL_WINDOW_SIZE = 4;
public static final int MAX_FRAME_SIZE = 5;
public static final int MAX_HEADER_LIST_SIZE = 6;
public static final int ENABLE_CONNECT_PROTOCOL = 8;
private final Map<Integer, Integer> settings;
private final boolean reply;

View File

@ -177,22 +177,22 @@ public class MetaDataBuilder
_contentLength = field.getLongValue();
_fields.add(field);
break;
case TE:
if ("trailers".equalsIgnoreCase(value))
_fields.add(field);
else
streamException("Unsupported TE value '%s'", value);
break;
case CONNECTION:
if ("TE".equalsIgnoreCase(value))
_fields.add(field);
else
streamException("Connection specific field '%s'", header);
break;
break;
default:
default:
if (name.charAt(0) == ':')
streamException("Unknown pseudo header '%s'", name);
else
@ -238,7 +238,7 @@ public class MetaDataBuilder
_streamException.addSuppressed(new Throwable());
throw _streamException;
}
if (_request && _response)
throw new HpackException.StreamException("Request and Response headers");
@ -268,7 +268,7 @@ public class MetaDataBuilder
throw new HpackException.StreamException("No Status");
return new MetaData.Response(HttpVersion.HTTP_2, _status, fields, _contentLength);
}
return new MetaData(HttpVersion.HTTP_2, fields, _contentLength);
}
finally

View File

@ -32,7 +32,9 @@ import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
@ -88,6 +90,18 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
return send(channel, exchange);
}
@Override
protected void normalizeRequest(Request request)
{
super.normalizeRequest(request);
if (request instanceof HttpUpgrader.Factory)
{
HttpUpgrader upgrader = ((HttpUpgrader.Factory)request).newHttpUpgrader(HttpVersion.HTTP_2);
((HttpRequest)request).getConversation().setAttribute(HttpUpgrader.class.getName(), upgrader);
upgrader.prepare((HttpRequest)request);
}
}
protected HttpChannelOverHTTP2 acquireHttpChannel()
{
HttpChannelOverHTTP2 channel = idleChannels.poll();

View File

@ -27,10 +27,12 @@ import java.util.Queue;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
@ -110,7 +112,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
if (LOG.isDebugEnabled())
LOG.debug("Successful HTTP2 tunnel on {} via {}", stream, endPoint);
((IStream)stream).setAttachment(endPoint);
httpRequest.getConversation().setAttribute(EndPoint.class.getName(), endPoint);
HttpConversation conversation = httpRequest.getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
HttpUpgrader upgrader = (HttpUpgrader)conversation.getAttribute(HttpUpgrader.class.getName());
if (upgrader != null)
upgrader.upgrade(httpResponse, endPoint);
}
if (responseHeaders(exchange))

View File

@ -59,7 +59,16 @@ public class HttpSenderOverHTTP2 extends HttpSender
MetaData.Request metaData;
if (isTunnel)
{
metaData = new MetaData.Request(request.getMethod(), null, new HostPortHttpField(request.getPath()), null, HttpVersion.HTTP_2, request.getHeaders());
String upgradeProtocol = request.getUpgradeProtocol();
if (upgradeProtocol == null)
{
metaData = new MetaData.ConnectRequest((String)null, new HostPortHttpField(request.getPath()), null, request.getHeaders(), null);
}
else
{
HostPortHttpField authority = new HostPortHttpField(request.getHost(), request.getPort());
metaData = new MetaData.ConnectRequest(request.getScheme(), authority, request.getPath(), request.getHeaders(), upgradeProtocol);
}
}
else
{

View File

@ -60,6 +60,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private int maxFrameLength = Frame.DEFAULT_MAX_LENGTH;
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private boolean connectProtocolEnabled = true;
private RateControl.Factory rateControlFactory = new WindowRateControl.Factory(20);
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
@ -185,6 +186,17 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.maxSettingsKeys = maxSettingsKeys;
}
@ManagedAttribute("Whether CONNECT requests supports a protocol")
public boolean isConnectProtocolEnabled()
{
return connectProtocolEnabled;
}
public void setConnectProtocolEnabled(boolean connectProtocolEnabled)
{
this.connectProtocolEnabled = connectProtocolEnabled;
}
/**
* @return the factory that creates RateControl objects
*/
@ -237,6 +249,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
settings.put(SettingsFrame.MAX_HEADER_LIST_SIZE, getHttpConfiguration().getRequestHeaderSize());
settings.put(SettingsFrame.ENABLE_CONNECT_PROTOCOL, isConnectProtocolEnabled() ? 1 : 0);
return settings;
}
@ -259,6 +272,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
session.setWriteThreshold(getHttpConfiguration().getOutputBufferSize());
session.setConnectProtocolEnabled(isConnectProtocolEnabled());
ServerParser parser = newServerParser(connector, session, getRateControlFactory().newRateControl(endPoint));
parser.setMaxFrameLength(getMaxFrameLength());

View File

@ -102,6 +102,15 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return this;
}
@Override
public void onBeforeData(Stream stream)
{
// Do not notify DATA frame listeners until demanded.
// This allows CONNECT requests with pseudo header :protocol
// (e.g. WebSocket over HTTP/2) to buffer DATA frames
// until they upgrade and are ready to process them.
}
@Override
public boolean onIdleTimeout(Session session)
{

View File

@ -108,6 +108,16 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
if (stream != null)
{
onStreamOpened(stream);
if (metaData instanceof MetaData.ConnectRequest)
{
if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null)
{
stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP);
return;
}
}
stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
@ -140,8 +139,13 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
onRequestComplete();
}
boolean connect = request instanceof MetaData.ConnectRequest;
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !_expect100Continue && !HttpMethod.CONNECT.is(request.getMethod());
!endStream && !_expect100Continue && !connect;
// Delay the demand of DATA frames for CONNECT with :protocol.
if (!connect || request.getProtocol() == null)
getStream().demand(1);
if (LOG.isDebugEnabled())
{

View File

@ -318,12 +318,19 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment();
Request request = channel.getRequest();
if (request.getHttpInput().hasContent())
return channel.sendErrorOrAbort("Unexpected content in CONNECT request");
Connection connection = (Connection)request.getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
EndPoint endPoint = connection.getEndPoint();
endPoint.upgrade(connection);
stream.setAttachment(endPoint);
if (request.getHttpInput().hasContent())
return channel.sendErrorOrAbort("Unexpected content in CONNECT request");
// Only now that we have switched the attachment,
// we can demand DATA frames to process them.
stream.demand(1);
if (LOG.isDebugEnabled())
LOG.debug("Upgrading to {}", connection);
return false;
}
@ -333,6 +340,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
Object attachment = stream.getAttachment();
if (attachment instanceof HttpChannelOverHTTP2)
{
// TODO: we used to "fake" a 101 response to upgrade the endpoint
// but we don't anymore, so this code should be deleted.
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)attachment;
if (channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{

View File

@ -412,6 +412,13 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
return true;
}
@Override
protected boolean checkAndPrepareUpgrade()
{
// TODO: move the code from HttpConnection.upgrade() here?
return false;
}
/**
* <p>Attempts to perform an HTTP/1.1 upgrade.</p>
* <p>The upgrade looks up a {@link ConnectionFactory.Upgrading} from the connector

View File

@ -378,7 +378,7 @@ public class Session implements SessionHandler.SessionIf
try
{
HttpSessionEvent event = new HttpSessionEvent(this);
for (String name : _sessionData.getKeys())
for (String name : _sessionData.getKeys())
{
Object value = _sessionData.getAttribute(name);
if (value instanceof HttpSessionActivationListener)

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.javax.client;
import java.net.URI;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
@ -43,11 +43,11 @@ public class JavaxClientUpgradeRequest extends ClientUpgradeRequest
}
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public void upgrade(HttpResponse response, EndPoint endPoint)
{
frameHandler.setUpgradeRequest(new DelegatedJavaxClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJavaxClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
super.upgrade(response, endPoint);
}
@Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.javax.tests;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -53,7 +54,7 @@ public class LocalFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
public LocalFuzzer(Provider provider, CharSequence requestPath) throws Exception
{
this(provider, requestPath, UpgradeUtils.newDefaultUpgradeRequestHeaders());
this(provider, requestPath, new HashMap<>());
}
public LocalFuzzer(Provider provider, CharSequence requestPath, Map<String, String> headers) throws Exception

View File

@ -208,9 +208,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
}
@Override
protected void customize(EndPoint endp)
protected void customize(EndPoint endPoint)
{
frameCapture.setEndPoint(endp);
frameCapture.setEndPoint(endPoint);
futureCapture.complete(frameCapture);
}

View File

@ -19,9 +19,6 @@
package org.eclipse.jetty.websocket.javax.tests;
import java.util.Map;
import java.util.TreeMap;
import org.eclipse.jetty.http.HttpHeader;
public class UpgradeUtils
{
@ -31,32 +28,9 @@ public class UpgradeUtils
upgradeRequest.append("GET ");
upgradeRequest.append(requestPath == null ? "/" : requestPath);
upgradeRequest.append(" HTTP/1.1\r\n");
headers.entrySet().stream().forEach(e ->
headers.entrySet().forEach(e ->
upgradeRequest.append(e.getKey()).append(": ").append(e.getValue()).append("\r\n"));
upgradeRequest.append("\r\n");
return upgradeRequest.toString();
}
public static String generateUpgradeRequest()
{
return generateUpgradeRequest("/", newDefaultUpgradeRequestHeaders());
}
public static String generateUpgradeRequest(CharSequence requestPath)
{
return generateUpgradeRequest(requestPath, newDefaultUpgradeRequestHeaders());
}
public static Map<String, String> newDefaultUpgradeRequestHeaders()
{
Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
headers.put("Host", "local");
headers.put("Connection", "Upgrade");
headers.put("Upgrade", "WebSocket");
headers.put(HttpHeader.SEC_WEBSOCKET_KEY.asString(), "dGhlIHNhbXBsZSBub25jZQ==");
headers.put(HttpHeader.ORIGIN.asString(), "ws://local/");
// headers.put(WSConstants.SEC_WEBSOCKET_PROTOCOL, "echo");
headers.put(HttpHeader.SEC_WEBSOCKET_VERSION.asString(), "13");
return headers;
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.javax.tests.server;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -30,7 +31,6 @@ import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.tests.Fuzzer;
import org.eclipse.jetty.websocket.javax.tests.UpgradeUtils;
import org.eclipse.jetty.websocket.javax.tests.WSServer;
import org.eclipse.jetty.websocket.javax.tests.coders.DateDecoder;
import org.eclipse.jetty.websocket.javax.tests.coders.TimeEncoder;
@ -72,8 +72,8 @@ public class AnnotatedServerEndpointTest
private void assertResponse(String message, String expectedText) throws Exception
{
Map<String, String> upgradeRequest = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeRequest.put(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), "echo");
Map<String, String> headers = new HashMap<>();
headers.put(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), "echo");
List<Frame> send = new ArrayList<>();
send.add(new Frame(OpCode.TEXT).setPayload(message));
@ -83,7 +83,7 @@ public class AnnotatedServerEndpointTest
expect.add(new Frame(OpCode.TEXT).setPayload(expectedText));
expect.add(CloseStatus.toFrame(CloseStatus.NORMAL));
try (Fuzzer session = server.newNetworkFuzzer("/app/echo", upgradeRequest))
try (Fuzzer session = server.newNetworkFuzzer("/app/echo", headers))
{
session.sendFrames(send);
session.expect(expect);

View File

@ -12,4 +12,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.websocket.LEVEL=INFO
# org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.client.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.server.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.server.LEVEL=DEBUG

View File

@ -255,34 +255,6 @@ public interface UpgradeRequest
*/
void setHeaders(Map<String, List<String>> headers);
/**
* Set the HTTP Version to use.
* <p>
* As of <a href="http://tools.ietf.org/html/rfc6455">RFC6455 (December 2011)</a> this should always be
* {@code HTTP/1.1}
*
* @param httpVersion the HTTP version to use.
*/
void setHttpVersion(String httpVersion);
/**
* Set the HTTP method to use.
* <p>
* As of <a href="http://tools.ietf.org/html/rfc6455">RFC6455 (December 2011)</a> this is always {@code GET}
*
* @param method the HTTP method to use.
*/
void setMethod(String method);
/**
* Set the Request URI to use for this request.
* <p>
* Must be an absolute URI with scheme {@code 'ws'} or {@code 'wss'}
*
* @param uri the Request URI
*/
void setRequestURI(URI uri);
/**
* Set the Session associated with this request.
* <p>

View File

@ -26,9 +26,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -46,7 +48,6 @@ public final class ClientUpgradeRequest implements UpgradeRequest
private String httpVersion;
private String method;
private String host;
private boolean secure;
public ClientUpgradeRequest()
{
@ -57,12 +58,9 @@ public final class ClientUpgradeRequest implements UpgradeRequest
{
this.requestURI = uri;
String scheme = uri.getScheme();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
{
if (!HttpScheme.WS.is(scheme) || !HttpScheme.WSS.is(scheme))
throw new IllegalArgumentException("URI scheme must be 'ws' or 'wss'");
}
this.host = this.requestURI.getHost();
this.parameters.clear();
}
@Override
@ -193,11 +191,7 @@ public final class ClientUpgradeRequest implements UpgradeRequest
public String getProtocolVersion()
{
String version = getHeader("Sec-WebSocket-Version");
if (version == null)
{
return "13"; // Default
}
return version;
return Objects.requireNonNullElse(version, "13");
}
@Override
@ -288,8 +282,7 @@ public final class ClientUpgradeRequest implements UpgradeRequest
@Override
public void setHeaders(Map<String, List<String>> headers)
{
headers.clear();
this.headers.clear();
for (Map.Entry<String, List<String>> entry : headers.entrySet())
{
String name = entry.getKey();
@ -298,24 +291,6 @@ public final class ClientUpgradeRequest implements UpgradeRequest
}
}
@Override
public void setHttpVersion(String httpVersion)
{
this.httpVersion = httpVersion;
}
@Override
public void setMethod(String method)
{
this.method = method;
}
@Override
public void setRequestURI(URI uri)
{
this.requestURI = uri;
}
@Override
public void setSession(Object session)
{

View File

@ -231,24 +231,6 @@ public class DelegatedJettyClientUpgradeRequest implements UpgradeRequest
// TODO
}
@Override
public void setHttpVersion(String httpVersion)
{
// TODO
}
@Override
public void setMethod(String method)
{
}
@Override
public void setRequestURI(URI uri)
{
// TODO
}
@Override
public void setSession(Object session)
{

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
@ -89,18 +88,18 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
}
@Override
protected void customize(EndPoint endp)
protected void customize(EndPoint endPoint)
{
super.customize(endp);
handshakeRequest.configure(endp);
super.customize(endPoint);
handshakeRequest.configure(endPoint);
}
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public void upgrade(HttpResponse response, EndPoint endPoint)
{
frameHandler.setUpgradeRequest(new DelegatedJettyClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJettyClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
super.upgrade(response, endPoint);
}
@Override

View File

@ -185,24 +185,6 @@ public class DummyUpgradeRequest implements UpgradeRequest
}
@Override
public void setHttpVersion(String httpVersion)
{
}
@Override
public void setMethod(String method)
{
}
@Override
public void setRequestURI(URI uri)
{
}
@Override
public void setSession(Object session)
{

View File

@ -197,24 +197,6 @@ public class UpgradeRequestAdapter implements UpgradeRequest
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setHttpVersion(String httpVersion)
{
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setMethod(String method)
{
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setRequestURI(URI uri)
{
throw new UnsupportedOperationException("Not supported from Servlet API");
}
@Override
public void setSession(Object session)
{

View File

@ -19,26 +19,49 @@
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-api</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.tests</groupId>
<artifactId>jetty-http-tools</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>compile</scope>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-java-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,348 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.servlet.internal.UpgradeHttpServletRequest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsStringIgnoringCase;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WebSocketOverHTTP2Test
{
private Server server;
private ServerConnector connector;
private ServerConnector tlsConnector;
private WebSocketClient wsClient;
private void startServer() throws Exception
{
startServer(new TestJettyWebSocketServlet());
}
private void startServer(TestJettyWebSocketServlet servlet) throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
HttpConfiguration httpConfig = new HttpConfiguration();
HttpConnectionFactory h1c = new HttpConnectionFactory(httpConfig);
HTTP2CServerConnectionFactory h2c = new HTTP2CServerConnectionFactory(httpConfig);
connector = new ServerConnector(server, 1, 1, h1c, h2c);
server.addConnector(connector);
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
httpsConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory h1s = new HttpConnectionFactory(httpsConfig);
HTTP2ServerConnectionFactory h2s = new HTTP2ServerConnectionFactory(httpsConfig);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
alpn.setDefaultProtocol(h1s.getProtocol());
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
tlsConnector = new ServerConnector(server, 1, 1, ssl, alpn, h1s, h2s);
server.addConnector(tlsConnector);
ServletContextHandler context = new ServletContextHandler(server, "/");
context.addServlet(new ServletHolder(servlet), "/ws/*");
JettyWebSocketServletContainerInitializer.configure(context, null);
server.start();
}
private void startClient(Function<ClientConnector, ClientConnectionFactory.Info> protocolFn) throws Exception
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSslContextFactory(new SslContextFactory.Client(true));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, protocolFn.apply(clientConnector)));
wsClient = new WebSocketClient(httpClient);
wsClient.start();
}
@AfterEach
public void stopServer() throws Exception
{
if (server != null)
server.stop();
if (wsClient != null)
wsClient.stop();
}
@Test
public void testWebSocketOverDynamicHTTP1() throws Exception
{
testWebSocketOverDynamicTransport(clientConnector -> HttpClientConnectionFactory.HTTP11);
}
@Test
public void testWebSocketOverDynamicHTTP2() throws Exception
{
testWebSocketOverDynamicTransport(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2C(new HTTP2Client(clientConnector)));
}
private void testWebSocketOverDynamicTransport(Function<ClientConnector, ClientConnectionFactory.Info> protocolFn) throws Exception
{
startServer();
startClient(protocolFn);
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/echo");
Session session = wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS);
String text = "websocket";
session.getRemote().sendString(text);
String message = wsEndPoint.messageQueue.poll(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(text, message);
session.close(StatusCode.NORMAL, null);
assertTrue(wsEndPoint.closeLatch.await(5, TimeUnit.SECONDS));
assertEquals(StatusCode.NORMAL, wsEndPoint.statusCode);
assertNull(wsEndPoint.error);
}
@Test
public void testConnectProtocolDisabled() throws Exception
{
startServer();
AbstractHTTP2ServerConnectionFactory h2c = connector.getBean(AbstractHTTP2ServerConnectionFactory.class);
h2c.setConnectProtocolEnabled(false);
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2C(new HTTP2Client(clientConnector)));
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/echo");
ExecutionException failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
Throwable cause = failure.getCause();
assertThat(cause.getMessage(), containsStringIgnoringCase(ErrorCode.PROTOCOL_ERROR.name()));
}
@Test
public void testSlowWebSocketUpgradeWithHTTP2DataFramesQueued() throws Exception
{
startServer(new TestJettyWebSocketServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
super.service(request, response);
// Flush the response to the client then wait before exiting
// this method so that the client can send HTTP/2 DATA frames
// that will be processed by the server while this method sleeps.
response.flushBuffer();
Thread.sleep(1000);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
// Connect and send immediately a message, so the message
// arrives to the server while the server is still upgrading.
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("wss://localhost:" + tlsConnector.getLocalPort() + "/ws/echo");
Session session = wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS);
String text = "websocket";
session.getRemote().sendString(text);
String message = wsEndPoint.messageQueue.poll(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(text, message);
session.close(StatusCode.NORMAL, null);
assertTrue(wsEndPoint.closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testWebSocketConnectPortDoesNotExist() throws Exception
{
startServer();
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + (connector.getLocalPort()+1) + "/ws/echo");
ExecutionException failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
Throwable cause = failure.getCause();
assertThat(cause, instanceOf(ConnectException.class));
assertThat(cause.getMessage(), containsStringIgnoringCase("Connection refused"));
}
@Test
public void testWebSocketNotFound() throws Exception
{
startServer();
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/nothing");
ExecutionException failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
Throwable cause = failure.getCause();
assertThat(cause, instanceOf(UpgradeException.class));
assertThat(cause.getMessage(), containsStringIgnoringCase("Unexpected HTTP Response Status Code: 501"));
}
@Test
public void testNotNegotiated() throws Exception
{
startServer();
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/null");
ExecutionException failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
Throwable cause = failure.getCause();
assertThat(cause, instanceOf(UpgradeException.class));
assertThat(cause.getMessage(), containsStringIgnoringCase("Unexpected HTTP Response Status Code: 503"));
}
@Test
public void testThrowFromCreator() throws Exception
{
startServer();
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/throw");
ExecutionException failure;
try (StacklessLogging stacklessLogging = new StacklessLogging(HttpChannel.class))
{
failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
}
Throwable cause = failure.getCause();
assertThat(cause, instanceOf(UpgradeException.class));
assertThat(cause.getMessage(), containsStringIgnoringCase("Unexpected HTTP Response Status Code: 500"));
}
@Test
public void testServerConnectionClose() throws Exception
{
startServer();
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
EventSocket wsEndPoint = new EventSocket();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/connectionClose");
ExecutionException failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
Throwable cause = failure.getCause();
assertThat(cause, instanceOf(ClosedChannelException.class));
}
private static class TestJettyWebSocketServlet extends JettyWebSocketServlet
{
@Override
protected void configure(JettyWebSocketServletFactory factory)
{
factory.addMapping("/ws/echo", (request, response) -> new EchoSocket());
factory.addMapping("/ws/null", (request, response) -> null);
factory.addMapping("/ws/throw", (request, response) ->
{
throw new RuntimeException("throwing from creator");
});
factory.addMapping("/ws/connectionClose", (request, response) ->
{
UpgradeHttpServletRequest servletRequest = (UpgradeHttpServletRequest)request.getHttpServletRequest();
Request baseRequest = servletRequest.getBaseRequest();
baseRequest.getHttpChannel().getEndPoint().close();
return new EchoSocket();
});
}
}
}

View File

@ -7,4 +7,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG
# org.eclipse.jetty.client.LEVEL=DEBUG
# org.eclipse.jetty.io.LEVEL=DEBUG
# org.eclipse.jetty.io.ManagedSelector.LEVEL=INFO
# org.eclipse.jetty.io.ManagedSelector.LEVEL=INFO

View File

@ -22,29 +22,23 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionUpgrader;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
@ -65,10 +59,9 @@ import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
public abstract class ClientUpgradeRequest extends HttpRequest implements Response.CompleteListener, HttpConnectionUpgrader
public abstract class ClientUpgradeRequest extends HttpRequest implements Response.CompleteListener, HttpUpgrader.Factory
{
public static ClientUpgradeRequest from(WebSocketCoreClient webSocketClient, URI requestURI, FrameHandler frameHandler)
{
@ -105,8 +98,8 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
throw new IllegalArgumentException("WebSocket URI must include a scheme");
}
String scheme = requestURI.getScheme().toLowerCase(Locale.ENGLISH);
if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
String scheme = requestURI.getScheme();
if (!HttpScheme.WS.is(scheme) && !HttpScheme.WSS.is(scheme))
{
throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
}
@ -118,10 +111,6 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
this.wsClient = webSocketClient;
this.futureCoreSession = new CompletableFuture<>();
method(HttpMethod.GET);
version(HttpVersion.HTTP_1_1);
getConversation().setAttribute(HttpConnectionUpgrader.class.getName(), this);
}
public void setConfiguration(FrameHandler.ConfigurationCustomizer config)
@ -159,8 +148,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
public List<String> getSubProtocols()
{
List<String> subProtocols = getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, true);
return subProtocols;
return getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, true);
}
public void setSubProtocols(String... protocols)
@ -197,7 +185,6 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
throw new IllegalArgumentException("FrameHandler could not be created", t);
}
initWebSocketHeaders();
super.send(listener);
}
@ -234,7 +221,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
}
Throwable failure = result.getFailure();
boolean wrapFailure = !((failure instanceof IOException) || (failure instanceof UpgradeException));
boolean wrapFailure = !(failure instanceof IOException) && !(failure instanceof UpgradeException);
if (wrapFailure)
failure = new UpgradeException(requestURI, responseStatusCode, responseLine, failure);
handleException(failure);
@ -265,20 +252,67 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
}
}
@SuppressWarnings("Duplicates")
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public HttpUpgrader newHttpUpgrader(HttpVersion version)
{
if (!this.getHeaders().get(HttpHeader.UPGRADE).equalsIgnoreCase("websocket"))
throw new HttpResponseException("Not a WebSocket Upgrade", response);
if (version == HttpVersion.HTTP_1_1)
return new HttpUpgraderOverHTTP(this);
else if (version == HttpVersion.HTTP_2)
return new HttpUpgraderOverHTTP2(this);
else
throw new UnsupportedOperationException("Unsupported HTTP version for upgrade: " + version);
}
// Check the Accept hash
String reqKey = this.getHeaders().get(HttpHeader.SEC_WEBSOCKET_KEY);
String expectedHash = WebSocketCore.hashKey(reqKey);
String respHash = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_ACCEPT);
if (expectedHash.equalsIgnoreCase(respHash) == false)
throw new HttpResponseException("Invalid Sec-WebSocket-Accept hash (was:" + respHash + ", expected:" + expectedHash + ")", response);
/**
* Allow for overridden customization of endpoint (such as special transport level properties: e.g. TCP keepAlive)
*/
protected void customize(EndPoint endPoint)
{
}
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated);
}
public abstract FrameHandler getFrameHandler();
void requestComplete()
{
// Add extensions header filtering out internal extensions and internal parameters.
HttpFields headers = getHeaders();
for (ExtensionConfig config : requestedExtensions)
{
if (config.getName().startsWith("@"))
continue;
headers.add(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, config.getParameterizedNameWithoutInternalParams());
}
notifyUpgradeListeners((listener) -> listener.onHandshakeRequest(this));
}
private void notifyUpgradeListeners(Consumer<UpgradeListener> action)
{
for (UpgradeListener listener : upgradeListeners)
{
try
{
action.accept(listener);
}
catch (Throwable t)
{
LOG.info("Exception while invoking listener " + listener, t);
}
}
}
public void upgrade(HttpResponse response, EndPoint endPoint)
{
// Parse the Negotiated Extensions
List<ExtensionConfig> negotiatedExtensions = new ArrayList<>();
HttpField extField = response.getHeaders().getField(HttpHeader.SEC_WEBSOCKET_EXTENSIONS);
@ -349,7 +383,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
if (values != null)
{
if (values.length > 1)
throw new WebSocketException("Upgrade failed: Too many WebSocket subprotocol's in response: " + values);
throw new WebSocketException("Upgrade failed: Too many WebSocket subprotocol's in response: " + Arrays.toString(values));
else if (values.length == 1)
negotiatedSubProtocol = values[0];
}
@ -363,8 +397,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
throw new WebSocketException("Upgrade failed: subprotocol [" + negotiatedSubProtocol + "] not found in offered subprotocols " + offeredSubProtocols);
// We can upgrade
EndPoint endp = httpConnection.getEndPoint();
customize(endp);
customize(endPoint);
Request request = response.getRequest();
Negotiated negotiated = new Negotiated(
@ -378,7 +411,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
customizer.customize(coreSession);
HttpClient httpClient = wsClient.getHttpClient();
WebSocketConnection wsConnection = newWebSocketConnection(endp, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), coreSession);
WebSocketConnection wsConnection = newWebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), coreSession);
wsClient.getEventListeners().forEach(wsConnection::addEventListener);
coreSession.setWebSocketConnection(wsConnection);
notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));
@ -386,7 +419,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
// Now swap out the connection
try
{
endp.upgrade(wsConnection);
endPoint.upgrade(wsConnection);
futureCoreSession.complete(coreSession);
}
catch (Throwable t)
@ -394,88 +427,4 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
futureCoreSession.completeExceptionally(t);
}
}
/**
* Allow for overridden customization of endpoint (such as special transport level properties: e.g. TCP keepAlive)
*
* @see <a href="https://github.com/eclipse/jetty.project/issues/1811">Issue #1811 - Customization of WebSocket Connections via WebSocketPolicy</a>
*/
protected void customize(EndPoint endp)
{
}
protected WebSocketConnection newWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endp, executor, scheduler, byteBufferPool, coreSession);
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated);
}
public abstract FrameHandler getFrameHandler();
private final String genRandomKey()
{
byte[] bytes = new byte[16];
ThreadLocalRandom.current().nextBytes(bytes);
return Base64.getEncoder().encodeToString(bytes);
}
private void initWebSocketHeaders()
{
method(HttpMethod.GET);
version(HttpVersion.HTTP_1_1);
// The Upgrade Headers
setHeaderIfNotPresent(HttpHeader.UPGRADE, "websocket");
setHeaderIfNotPresent(HttpHeader.CONNECTION, "Upgrade");
// The WebSocket Headers
setHeaderIfNotPresent(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey());
setHeaderIfNotPresent(HttpHeader.SEC_WEBSOCKET_VERSION, WebSocketConstants.SPEC_VERSION_STRING);
// (Per the hybi list): Add no-cache headers to avoid compatibility issue.
// There are some proxies that rewrite "Connection: upgrade"
// to "Connection: close" in the response if a request doesn't contain
// these headers.
setHeaderIfNotPresent(HttpHeader.PRAGMA, "no-cache");
setHeaderIfNotPresent(HttpHeader.CACHE_CONTROL, "no-cache");
// Add extensions header filtering out internal extensions and internal parameters.
HttpFields headers = getHeaders();
for (ExtensionConfig config : requestedExtensions)
{
if (config.getName().startsWith("@"))
continue;
headers.add(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, config.getParameterizedNameWithoutInternalParams());
}
// Notify upgrade hooks
notifyUpgradeListeners((listener) -> listener.onHandshakeRequest(this));
}
private void setHeaderIfNotPresent(HttpHeader header, String value)
{
if (!getHeaders().contains(header))
{
getHeaders().put(header, value);
}
}
private void notifyUpgradeListeners(Consumer<UpgradeListener> action)
{
for (UpgradeListener listener : upgradeListeners)
{
try
{
action.accept(listener);
}
catch (Throwable t)
{
LOG.warn("Unhandled error: " + t.getMessage(), t);
}
}
}
}

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.client;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
public class HttpUpgraderOverHTTP implements HttpUpgrader
{
private final ClientUpgradeRequest clientUpgradeRequest;
public HttpUpgraderOverHTTP(ClientUpgradeRequest clientUpgradeRequest)
{
this.clientUpgradeRequest = clientUpgradeRequest;
}
@Override
public void prepare(HttpRequest request)
{
request.method(HttpMethod.GET).version(HttpVersion.HTTP_1_1);
request.header(HttpHeader.SEC_WEBSOCKET_VERSION, WebSocketConstants.SPEC_VERSION_STRING);
request.header(HttpHeader.UPGRADE, "websocket");
request.header(HttpHeader.CONNECTION, "Upgrade");
request.header(HttpHeader.SEC_WEBSOCKET_KEY, generateRandomKey());
// Per the hybi list: Add no-cache headers to avoid compatibility issue.
// There are some proxies that rewrite "Connection: upgrade" to
// "Connection: close" in the response if a request doesn't contain
// these headers.
request.header(HttpHeader.PRAGMA, "no-cache");
request.header(HttpHeader.CACHE_CONTROL, "no-cache");
// Notify the UpgradeListeners now the headers are set.
clientUpgradeRequest.requestComplete();
}
private String generateRandomKey()
{
byte[] bytes = new byte[16];
ThreadLocalRandom.current().nextBytes(bytes);
return new String(Base64.getEncoder().encode(bytes), StandardCharsets.US_ASCII);
}
@Override
public void upgrade(HttpResponse response, EndPoint endPoint)
{
HttpRequest request = (HttpRequest)response.getRequest();
HttpFields requestHeaders = request.getHeaders();
if (!requestHeaders.get(HttpHeader.UPGRADE).equalsIgnoreCase("websocket"))
throw new HttpResponseException("Not a WebSocket Upgrade", response);
// Check the Accept hash
String reqKey = requestHeaders.get(HttpHeader.SEC_WEBSOCKET_KEY);
String expectedHash = WebSocketCore.hashKey(reqKey);
String respHash = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_ACCEPT);
if (!expectedHash.equalsIgnoreCase(respHash))
throw new HttpResponseException("Invalid Sec-WebSocket-Accept hash (was:" + respHash + ", expected:" + expectedHash + ")", response);
clientUpgradeRequest.upgrade(response, endPoint);
}
}

View File

@ -0,0 +1,54 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.client;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
public class HttpUpgraderOverHTTP2 implements HttpUpgrader
{
private final ClientUpgradeRequest clientUpgradeRequest;
public HttpUpgraderOverHTTP2(ClientUpgradeRequest clientUpgradeRequest)
{
this.clientUpgradeRequest = clientUpgradeRequest;
}
@Override
public void prepare(HttpRequest request)
{
request.method(HttpMethod.CONNECT);
request.upgradeProtocol("websocket");
request.header(HttpHeader.SEC_WEBSOCKET_VERSION, WebSocketConstants.SPEC_VERSION_STRING);
// Notify the UpgradeListeners now the headers are set.
clientUpgradeRequest.requestComplete();
}
@Override
public void upgrade(HttpResponse response, EndPoint endPoint)
{
clientUpgradeRequest.upgrade(response, endPoint);
}
}

View File

@ -23,19 +23,14 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.eclipse.jetty.websocket.core.server.internal.HandshakerSelector;
public interface Handshaker
{
static Handshaker newInstance()
{
return new RFC6455Handshaker();
return new HandshakerSelector();
}
boolean upgradeRequest(
WebSocketNegotiator negotiator,
HttpServletRequest request,
HttpServletResponse response,
FrameHandler.Customizer defaultCustomizer)
throws IOException;
boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException;
}

View File

@ -36,149 +36,25 @@ import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
public class Negotiation
public abstract class Negotiation
{
private final Request baseRequest;
private final HttpServletRequest request;
private final HttpServletResponse response;
private final List<ExtensionConfig> offeredExtensions;
private final List<String> offeredSubprotocols;
private final WebSocketComponents components;
private final String version;
private final Boolean upgrade;
private final String key;
private String version;
private List<ExtensionConfig> offeredExtensions;
private List<ExtensionConfig> negotiatedExtensions;
private String subprotocol;
private List<String> offeredProtocols;
private ExtensionStack extensionStack;
private String protocol;
/**
* @throws BadMessageException if there is any errors parsing the upgrade request
*/
public Negotiation(
Request baseRequest,
HttpServletRequest request,
HttpServletResponse response,
WebSocketComponents components) throws BadMessageException
public Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents)
{
this.baseRequest = baseRequest;
this.request = request;
this.response = response;
this.components = components;
Boolean upgrade = null;
String key = null;
String version = null;
QuotedCSV connectionCSVs = null;
QuotedCSV extensions = null;
QuotedCSV subprotocols = null;
try
{
for (HttpField field : baseRequest.getHttpFields())
{
if (field.getHeader() != null)
{
switch (field.getHeader())
{
case UPGRADE:
if (upgrade == null && "websocket".equalsIgnoreCase(field.getValue()))
upgrade = Boolean.TRUE;
break;
case CONNECTION:
if (connectionCSVs == null)
connectionCSVs = new QuotedCSV();
connectionCSVs.addValue(field.getValue());
break;
case SEC_WEBSOCKET_KEY:
key = field.getValue();
break;
case SEC_WEBSOCKET_VERSION:
version = field.getValue();
break;
case SEC_WEBSOCKET_EXTENSIONS:
if (extensions == null)
extensions = new QuotedCSV(field.getValue());
else
extensions.addValue(field.getValue());
break;
case SEC_WEBSOCKET_SUBPROTOCOL:
if (subprotocols == null)
subprotocols = new QuotedCSV(field.getValue());
else
subprotocols.addValue(field.getValue());
break;
default:
}
}
}
this.version = version;
this.key = key;
this.upgrade = upgrade != null && connectionCSVs != null && connectionCSVs.getValues().stream().anyMatch(s -> s.equalsIgnoreCase("Upgrade"));
Set<String> available = components.getExtensionRegistry().getAvailableExtensionNames();
offeredExtensions = extensions == null
? Collections.emptyList()
: extensions.getValues().stream()
.map(ExtensionConfig::parse)
.filter(ec -> available.contains(ec.getName().toLowerCase()) && !ec.getName().startsWith("@"))
.collect(Collectors.toList());
// Remove any parameters starting with "@", these are not to be negotiated by client (internal parameters).
offeredExtensions.forEach(ExtensionConfig::removeInternalParameters);
offeredSubprotocols = subprotocols == null
? Collections.emptyList()
: subprotocols.getValues();
negotiatedExtensions = new ArrayList<>();
for (ExtensionConfig config : offeredExtensions)
{
long matches = negotiatedExtensions.stream()
.filter(negotiatedConfig -> negotiatedConfig.getName().equals(config.getName())).count();
if (matches == 0)
negotiatedExtensions.add(new ExtensionConfig(config));
}
}
catch (Throwable t)
{
throw new BadMessageException("Invalid Handshake Request", t);
}
}
public String getKey()
{
return key;
}
public List<ExtensionConfig> getOfferedExtensions()
{
return offeredExtensions;
}
public void setNegotiatedExtensions(List<ExtensionConfig> extensions)
{
if (extensions == offeredExtensions)
return;
negotiatedExtensions = extensions == null ? null : new ArrayList<>(extensions);
extensionStack = null;
}
public List<ExtensionConfig> getNegotiatedExtensions()
{
return negotiatedExtensions;
}
public List<String> getOfferedSubprotocols()
{
return offeredSubprotocols;
this.components = webSocketComponents;
}
public Request getBaseRequest()
@ -196,25 +72,116 @@ public class Negotiation
return response;
}
public void setSubprotocol(String subprotocol)
public void negotiate() throws BadMessageException
{
this.subprotocol = subprotocol;
response.setHeader(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), subprotocol);
try
{
negotiateHeaders(getBaseRequest());
}
catch (Throwable x)
{
throw new BadMessageException("Invalid upgrade request", x);
}
}
public String getSubprotocol()
protected void negotiateHeaders(Request baseRequest)
{
return subprotocol;
QuotedCSV extensions = null;
QuotedCSV protocols = null;
for (HttpField field : baseRequest.getHttpFields())
{
if (field.getHeader() != null)
{
switch (field.getHeader())
{
case SEC_WEBSOCKET_VERSION:
version = field.getValue();
break;
case SEC_WEBSOCKET_EXTENSIONS:
if (extensions == null)
extensions = new QuotedCSV(field.getValue());
else
extensions.addValue(field.getValue());
break;
case SEC_WEBSOCKET_SUBPROTOCOL:
if (protocols == null)
protocols = new QuotedCSV(field.getValue());
else
protocols.addValue(field.getValue());
break;
default:
break;
}
}
}
Set<String> available = components.getExtensionRegistry().getAvailableExtensionNames();
offeredExtensions = extensions == null
? Collections.emptyList()
: extensions.getValues().stream()
.map(ExtensionConfig::parse)
.filter(ec -> available.contains(ec.getName().toLowerCase()) && !ec.getName().startsWith("@"))
.collect(Collectors.toList());
// Remove any parameters starting with "@", these are not to be negotiated by client (internal parameters).
offeredExtensions.forEach(ExtensionConfig::removeInternalParameters);
offeredProtocols = protocols == null
? Collections.emptyList()
: protocols.getValues();
negotiatedExtensions = new ArrayList<>();
for (ExtensionConfig config : offeredExtensions)
{
long matches = negotiatedExtensions.stream()
.filter(negotiatedConfig -> negotiatedConfig.getName().equals(config.getName())).count();
if (matches == 0)
negotiatedExtensions.add(new ExtensionConfig(config));
}
}
public abstract boolean validateHeaders();
public String getVersion()
{
return version;
}
public boolean isUpgrade()
public String getSubprotocol()
{
return upgrade;
return protocol;
}
public void setSubprotocol(String protocol)
{
this.protocol = protocol;
response.setHeader(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), protocol);
}
public List<String> getOfferedSubprotocols()
{
return offeredProtocols;
}
public List<ExtensionConfig> getOfferedExtensions()
{
return offeredExtensions;
}
public List<ExtensionConfig> getNegotiatedExtensions()
{
return negotiatedExtensions;
}
public void setNegotiatedExtensions(List<ExtensionConfig> extensions)
{
if (extensions == offeredExtensions)
return;
negotiatedExtensions = extensions;
extensionStack = null;
}
public ExtensionStack getExtensionStack()
@ -232,14 +199,14 @@ public class Negotiation
else
baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, null);
}
return extensionStack;
}
@Override
public String toString()
{
return String.format("Negotiation@%x{uri=%s,oe=%s,op=%s}",
return String.format("%s@%x{uri=%s,oe=%s,op=%s}",
getClass().getSimpleName(),
hashCode(),
getRequest().getRequestURI(),
getOfferedExtensions(),

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.core.server;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Function;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -47,7 +48,7 @@ public class WebSocketUpgradeHandler extends HandlerWrapper
public WebSocketUpgradeHandler(WebSocketNegotiator negotiator, String... pathSpecs)
{
this.negotiator = negotiator;
this.negotiator = Objects.requireNonNull(negotiator);
addPathSpec(pathSpecs);
}

View File

@ -0,0 +1,208 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public abstract class AbstractHandshaker implements Handshaker
{
protected static final Logger LOG = Log.getLogger(AbstractHandshaker.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
{
if (!validateRequest(request))
return false;
Negotiation negotiation = newNegotiation(request, response, new WebSocketComponents());
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
negotiation.negotiate();
if (!validateNegotiation(negotiation))
return false;
// Negotiate the FrameHandler
FrameHandler handler = negotiator.negotiate(negotiation);
if (!validateFrameHandler(handler, response))
return false;
// Handle error responses
Request baseRequest = negotiation.getBaseRequest();
if (response.isCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: response committed {}", request);
baseRequest.setHandled(true);
return false;
}
int httpStatus = response.getStatus();
if (httpStatus > 200)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: invalid http code {} {}", httpStatus, request);
response.flushBuffer();
baseRequest.setHandled(true);
return false;
}
// Validate negotiated protocol
String protocol = negotiation.getSubprotocol();
List<String> offeredProtocols = negotiation.getOfferedSubprotocols();
if (protocol != null)
{
if (!offeredProtocols.contains(protocol))
throw new WebSocketException("not upgraded: selected a protocol not present in offered protocols");
}
else
{
if (!offeredProtocols.isEmpty())
throw new WebSocketException("not upgraded: no protocol selected from offered protocols");
}
// validate negotiated extensions
for (ExtensionConfig config : negotiation.getNegotiatedExtensions())
{
if (config.getName().startsWith("@"))
continue;
long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches < 1)
throw new WebSocketException("Upgrade failed: negotiated extension not requested");
matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches > 1)
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
Negotiated negotiated = new Negotiated(baseRequest.getHttpURI().toURI(), protocol, baseRequest.isSecure(), extensionStack, WebSocketConstants.SPEC_VERSION_STRING);
// Create the Session
WebSocketCoreSession coreSession = newWebSocketCoreSession(handler, negotiated);
if (defaultCustomizer != null)
defaultCustomizer.customize(coreSession);
negotiator.customize(coreSession);
if (LOG.isDebugEnabled())
LOG.debug("session {}", coreSession);
WebSocketConnection connection = createWebSocketConnection(baseRequest, coreSession);
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
throw new WebSocketException("not upgraded: no connection");
HttpChannel httpChannel = baseRequest.getHttpChannel();
HttpConfiguration httpConfig = httpChannel.getHttpConfiguration();
connection.setUseInputDirectByteBuffers(httpConfig.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(httpChannel.isUseOutputDirectByteBuffers());
httpChannel.getConnector().getEventListeners().forEach(connection::addEventListener);
coreSession.setWebSocketConnection(connection);
Response baseResponse = baseRequest.getResponse();
prepareResponse(baseResponse, negotiation);
if (httpConfig.getSendServerVersion())
baseResponse.getHttpFields().put(SERVER_VERSION);
baseResponse.flushBuffer();
baseRequest.setHandled(true);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={} framehandler={}", connection, coreSession, handler);
return true;
}
protected abstract boolean validateRequest(HttpServletRequest request);
protected abstract Negotiation newNegotiation(HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents);
protected abstract boolean validateFrameHandler(FrameHandler frameHandler, HttpServletResponse response);
protected boolean validateNegotiation(Negotiation negotiation)
{
if (!negotiation.validateHeaders())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no upgrade header or connection upgrade", negotiation.getBaseRequest());
return false;
}
if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), negotiation.getBaseRequest());
return false;
}
return true;
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.SERVER, negotiated);
}
protected abstract WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession);
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
protected abstract void prepareResponse(Response response, Negotiation negotiation);
}

View File

@ -0,0 +1,46 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
/**
* Selects between the two Handshaker implementations,
* RFC6455 (HTTP/1.1 WebSocket Upgrades)
* and RFC68441 (HTTP/2 WebSocket Upgrades)
*/
public class HandshakerSelector implements Handshaker
{
private final RFC6455Handshaker rfc6455 = new RFC6455Handshaker();
private final RFC8441Handshaker rfc8441 = new RFC8441Handshaker();
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
{
// Try HTTP/1.1 WS upgrade, if this fails try an HTTP/2 WS upgrade if no response was committed.
return rfc6455.upgradeRequest(negotiator, request, response, defaultCustomizer) ||
!response.isCommitted() && rfc8441.upgradeRequest(negotiator, request, response, defaultCustomizer);
}
}

View File

@ -18,248 +18,97 @@
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public final class RFC6455Handshaker implements Handshaker
public final class RFC6455Handshaker extends AbstractHandshaker
{
static final Logger LOG = Log.getLogger(RFC6455Handshaker.class);
private static final HttpField UPGRADE_WEBSOCKET = new PreEncodedHttpField(HttpHeader.UPGRADE, "WebSocket");
private static final HttpField CONNECTION_UPGRADE = new PreEncodedHttpField(HttpHeader.CONNECTION, HttpHeader.UPGRADE.asString());
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response,
FrameHandler.Customizer defaultCustomizer) throws IOException
@Override
protected boolean validateRequest(HttpServletRequest request)
{
final Request baseRequest = Request.getBaseRequest(request);
final HttpChannel httpChannel = baseRequest.getHttpChannel();
final Connector connector = httpChannel.getConnector();
if (negotiator == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no WebSocketNegotiator {}", baseRequest);
return false;
}
if (!HttpMethod.GET.is(request.getMethod()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded method!=GET {}", baseRequest);
LOG.debug("not upgraded method!=GET {}", request);
return false;
}
if (!HttpVersion.HTTP_1_1.equals(baseRequest.getHttpVersion()))
if (!HttpVersion.HTTP_1_1.is(request.getProtocol()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded version!=1.1 {}", baseRequest);
LOG.debug("not upgraded version!=1.1 {}", request);
return false;
}
ByteBufferPool pool = negotiator.getByteBufferPool();
if (pool == null)
pool = baseRequest.getHttpChannel().getConnector().getByteBufferPool();
Negotiation negotiation = new Negotiation(
baseRequest,
request,
response,
new WebSocketComponents());
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
if (!negotiation.isUpgrade())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no upgrade header or connection upgrade", baseRequest);
return false;
}
if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), baseRequest);
return false;
}
if (negotiation.getKey() == null)
throw new BadMessageException("Missing request header 'Sec-WebSocket-Key'");
// Negotiate the FrameHandler
FrameHandler handler = negotiator.negotiate(negotiation);
if (LOG.isDebugEnabled())
LOG.debug("negotiated handler {}", handler);
// Handle error responses
if (response.isCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: response committed {}", baseRequest);
baseRequest.setHandled(true);
return false;
}
if (response.getStatus() > 200)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: error sent {} {}", response.getStatus(), baseRequest);
response.flushBuffer();
baseRequest.setHandled(true);
return false;
}
// Check for handler
if (handler == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided {}", baseRequest);
return false;
}
// validate negotiated subprotocol
String subprotocol = negotiation.getSubprotocol();
if (subprotocol != null)
{
if (!negotiation.getOfferedSubprotocols().contains(subprotocol))
throw new WebSocketException("not upgraded: selected a subprotocol not present in offered subprotocols");
}
else
{
if (!negotiation.getOfferedSubprotocols().isEmpty())
throw new WebSocketException("not upgraded: no subprotocol selected from offered subprotocols");
}
// validate negotiated extensions
for (ExtensionConfig config : negotiation.getNegotiatedExtensions())
{
if (config.getName().startsWith("@"))
continue;
long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches < 1)
throw new WebSocketException("Upgrade failed: negotiated extension not requested");
matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches > 1)
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
Negotiated negotiated = new Negotiated(
baseRequest.getHttpURI().toURI(),
subprotocol,
baseRequest.isSecure(),
extensionStack,
WebSocketConstants.SPEC_VERSION_STRING);
// Create the Session
WebSocketCoreSession coreSession = newWebSocketCoreSession(handler, negotiated);
if (defaultCustomizer != null)
defaultCustomizer.customize(coreSession);
negotiator.customize(coreSession);
if (LOG.isDebugEnabled())
LOG.debug("session {}", coreSession);
// Create a connection
WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
// TODO: perhaps use of direct buffers should be WebSocket specific
// rather than inheriting the setting from HttpConfiguration.
HttpConfiguration httpConfig = httpChannel.getHttpConfiguration();
connection.setUseInputDirectByteBuffers(httpConfig.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(httpChannel.isUseOutputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
throw new WebSocketException("not upgraded: no connection");
connector.getEventListeners().forEach(connection::addEventListener);
coreSession.setWebSocketConnection(connection);
// send upgrade response
Response baseResponse = baseRequest.getResponse();
baseResponse.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
baseResponse.getHttpFields().put(UPGRADE_WEBSOCKET);
baseResponse.getHttpFields().put(CONNECTION_UPGRADE);
baseResponse.getHttpFields().put(HttpHeader.SEC_WEBSOCKET_ACCEPT, WebSocketCore.hashKey(negotiation.getKey()));
// See bugs.eclipse.org/485969
if (getSendServerVersion(connector))
{
baseResponse.getHttpFields().put(SERVER_VERSION);
}
baseResponse.flushBuffer();
baseRequest.setHandled(true);
// upgrade
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={}", connection, coreSession);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
return true;
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
@Override
protected Negotiation newNegotiation(HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents)
{
return new WebSocketCoreSession(handler, Behavior.SERVER, negotiated);
return new RFC6455Negotiation(Request.getBaseRequest(request), request, response, webSocketComponents);
}
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
@Override
protected boolean validateNegotiation(Negotiation negotiation)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
private boolean getSendServerVersion(Connector connector)
{
ConnectionFactory connFactory = connector.getConnectionFactory(HttpVersion.HTTP_1_1.asString());
if (connFactory == null)
boolean result = super.validateNegotiation(negotiation);
if (!result)
return false;
if (((RFC6455Negotiation)negotiation).getKey() == null)
throw new BadMessageException("Missing request header 'Sec-WebSocket-Key'");
return true;
}
if (connFactory instanceof HttpConnectionFactory)
@Override
protected boolean validateFrameHandler(FrameHandler frameHandler, HttpServletResponse response)
{
if (frameHandler == null)
{
HttpConfiguration httpConf = ((HttpConnectionFactory)connFactory).getHttpConfiguration();
if (httpConf != null)
return httpConf.getSendServerVersion();
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided");
return false;
}
return false;
return true;
}
@Override
protected WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession)
{
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
}
@Override
protected void prepareResponse(Response response, Negotiation negotiation)
{
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
HttpFields responseFields = response.getHttpFields();
responseFields.put(UPGRADE_WEBSOCKET);
responseFields.put(CONNECTION_UPGRADE);
responseFields.put(HttpHeader.SEC_WEBSOCKET_ACCEPT, WebSocketCore.hashKey(((RFC6455Negotiation)negotiation).getKey()));
}
}

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.QuotedCSV;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.server.Negotiation;
public class RFC6455Negotiation extends Negotiation
{
private boolean successful;
private String key;
public RFC6455Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketComponents components) throws BadMessageException
{
super(baseRequest, request, response, components);
}
@Override
protected void negotiateHeaders(Request baseRequest)
{
super.negotiateHeaders(baseRequest);
boolean upgrade = false;
QuotedCSV connectionCSVs = null;
for (HttpField field : baseRequest.getHttpFields())
{
HttpHeader header = field.getHeader();
if (header != null)
{
switch (header)
{
case UPGRADE:
upgrade = "websocket".equalsIgnoreCase(field.getValue());
break;
case CONNECTION:
if (connectionCSVs == null)
connectionCSVs = new QuotedCSV();
connectionCSVs.addValue(field.getValue());
break;
case SEC_WEBSOCKET_KEY:
key = field.getValue();
break;
default:
break;
}
}
}
successful = upgrade && connectionCSVs != null &&
connectionCSVs.getValues().stream().anyMatch(s -> s.equalsIgnoreCase("upgrade"));
}
@Override
public boolean validateHeaders()
{
return successful;
}
public String getKey()
{
return key;
}
}

View File

@ -0,0 +1,94 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Negotiation;
public class RFC8441Handshaker extends AbstractHandshaker
{
@Override
protected boolean validateRequest(HttpServletRequest request)
{
if (!HttpMethod.CONNECT.is(request.getMethod()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded method!=GET {}", request);
return false;
}
if (!HttpVersion.HTTP_2.is(request.getProtocol()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded HttpVersion!=2 {}", request);
return false;
}
return true;
}
@Override
protected Negotiation newNegotiation(HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents)
{
return new RFC8441Negotiation(Request.getBaseRequest(request), request, response, webSocketComponents);
}
@Override
protected boolean validateFrameHandler(FrameHandler frameHandler, HttpServletResponse response)
{
if (frameHandler == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided");
response.setStatus(HttpStatus.SERVICE_UNAVAILABLE_503);
}
return true;
}
@Override
protected WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession)
{
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
EndPoint endPoint = httpChannel.getTunnellingEndPoint();
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
}
@Override
protected void prepareResponse(Response response, Negotiation negotiation)
{
response.setStatus(HttpStatus.OK_200);
}
}

View File

@ -0,0 +1,45 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.server.Negotiation;
public class RFC8441Negotiation extends Negotiation
{
public RFC8441Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketComponents components) throws BadMessageException
{
super(baseRequest, request, response, components);
}
@Override
public boolean validateHeaders()
{
MetaData.Request metaData = getBaseRequest().getMetaData();
if (metaData == null)
return false;
return "websocket".equals(metaData.getProtocol());
}
}

View File

@ -97,7 +97,7 @@ public class WebSocketNegotiationTest extends WebSocketTester
break;
case "testNotAcceptingExtensions":
negotiation.setNegotiatedExtensions(Collections.EMPTY_LIST);
negotiation.setNegotiatedExtensions(Collections.emptyList());
break;
case "testNoSubProtocolSelected":
@ -353,4 +353,4 @@ public class WebSocketNegotiationTest extends WebSocketTester
assertThat(response, containsString("400 Bad Request"));
}
}
}

View File

@ -26,15 +26,12 @@ import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
public class WebSocketServer
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
private URI serverUri;
@ -59,12 +56,12 @@ public class WebSocketServer
return server;
}
public WebSocketServer(FrameHandler frameHandler) throws Exception
public WebSocketServer(FrameHandler frameHandler)
{
this(new DefaultNegotiator(frameHandler));
}
public WebSocketServer(WebSocketNegotiator negotiator) throws Exception
public WebSocketServer(WebSocketNegotiator negotiator)
{
server = new Server();
ServerConnector connector = new ServerConnector(server);

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -34,8 +33,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageHandler;
@ -47,8 +44,6 @@ import static org.eclipse.jetty.util.Callback.NOOP;
public class ChatWebSocketServer
{
private static Logger LOG = Log.getLogger(ChatWebSocketServer.class);
private Set<MessageHandler> members = new HashSet<>();
private FrameHandler negotiate(Negotiation negotiation)
@ -77,7 +72,7 @@ public class ChatWebSocketServer
{
members.add(this);
callback.succeeded();
}, x -> callback.failed(x)));
}, callback::failed));
}
@Override

View File

@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
@ -55,6 +56,7 @@ public class UpgradeHttpServletRequest implements HttpServletRequest
{
private static final String UNSUPPORTED_WITH_WEBSOCKET_UPGRADE = "Feature unsupported with a Upgraded to WebSocket HttpServletRequest";
private final Request baseRequest;
private final ServletContext context;
private final DispatcherType dispatcher;
private final String method;
@ -110,8 +112,9 @@ public class UpgradeHttpServletRequest implements HttpServletRequest
remoteUser = httpRequest.getRemoteUser();
principal = httpRequest.getUserPrincipal();
authentication = Request.getBaseRequest(httpRequest).getAuthentication();
scope = Request.getBaseRequest(httpRequest).getUserIdentityScope();
baseRequest = Objects.requireNonNull(Request.getBaseRequest(httpRequest));
authentication = baseRequest.getAuthentication();
scope = baseRequest.getUserIdentityScope();
Enumeration<String> headerNames = httpRequest.getHeaderNames();
while (headerNames.hasMoreElements())
@ -278,6 +281,11 @@ public class UpgradeHttpServletRequest implements HttpServletRequest
return session;
}
public Request getBaseRequest()
{
return baseRequest;
}
@Override
public String getRequestedSessionId()
{