Reduced code duplication in Handshakers and Negotiations.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-11-14 17:46:40 +01:00
parent 06ce13e226
commit 98574f28a0
9 changed files with 465 additions and 511 deletions

View File

@ -36,146 +36,25 @@ import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.WebSocketComponents; import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack; import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
public class Negotiation public abstract class Negotiation
{ {
private final Request baseRequest; private final Request baseRequest;
private final HttpServletRequest request; private final HttpServletRequest request;
private final HttpServletResponse response; private final HttpServletResponse response;
private final List<ExtensionConfig> offeredExtensions;
private final List<String> offeredSubprotocols;
private final WebSocketComponents components; private final WebSocketComponents components;
private final String version; private String version;
private final Boolean upgrade; private List<ExtensionConfig> offeredExtensions;
private final String key;
private List<ExtensionConfig> negotiatedExtensions; private List<ExtensionConfig> negotiatedExtensions;
private String subprotocol; private List<String> offeredProtocols;
private ExtensionStack extensionStack; private ExtensionStack extensionStack;
private String protocol;
/** public Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents)
* @throws BadMessageException if there is any errors parsing the upgrade request
*/
public Negotiation(
Request baseRequest,
HttpServletRequest request,
HttpServletResponse response,
WebSocketComponents components) throws BadMessageException
{ {
this.baseRequest = baseRequest; this.baseRequest = baseRequest;
this.request = request; this.request = request;
this.response = response; this.response = response;
this.components = components; this.components = webSocketComponents;
Boolean upgrade = null;
String key = null;
String version = null;
QuotedCSV connectionCSVs = null;
QuotedCSV extensions = null;
QuotedCSV subprotocols = null;
try
{
for (HttpField field : baseRequest.getHttpFields())
{
if (field.getHeader() != null)
{
switch (field.getHeader())
{
case UPGRADE:
if (upgrade == null && "websocket".equalsIgnoreCase(field.getValue()))
upgrade = Boolean.TRUE;
break;
case CONNECTION:
if (connectionCSVs == null)
connectionCSVs = new QuotedCSV();
connectionCSVs.addValue(field.getValue());
break;
case SEC_WEBSOCKET_KEY:
key = field.getValue();
break;
case SEC_WEBSOCKET_VERSION:
version = field.getValue();
break;
case SEC_WEBSOCKET_EXTENSIONS:
if (extensions == null)
extensions = new QuotedCSV(field.getValue());
else
extensions.addValue(field.getValue());
break;
case SEC_WEBSOCKET_SUBPROTOCOL:
if (subprotocols == null)
subprotocols = new QuotedCSV(field.getValue());
else
subprotocols.addValue(field.getValue());
break;
default:
}
}
}
this.version = version;
this.key = key;
this.upgrade = upgrade != null && connectionCSVs != null && connectionCSVs.getValues().stream().anyMatch(s -> s.equalsIgnoreCase("Upgrade"));
Set<String> available = components.getExtensionRegistry().getAvailableExtensionNames();
offeredExtensions = extensions == null
? Collections.emptyList()
: extensions.getValues().stream()
.map(ExtensionConfig::parse)
.filter(ec -> available.contains(ec.getName().toLowerCase()) && !ec.getName().startsWith("@"))
.collect(Collectors.toList());
offeredSubprotocols = subprotocols == null
? Collections.emptyList()
: subprotocols.getValues();
negotiatedExtensions = new ArrayList<>();
for (ExtensionConfig config : offeredExtensions)
{
long matches = negotiatedExtensions.stream()
.filter(negotiatedConfig -> negotiatedConfig.getName().equals(config.getName())).count();
if (matches == 0)
negotiatedExtensions.add(config);
}
}
catch (Throwable t)
{
throw new BadMessageException("Invalid Handshake Request", t);
}
}
public String getKey()
{
return key;
}
public List<ExtensionConfig> getOfferedExtensions()
{
return offeredExtensions;
}
public void setNegotiatedExtensions(List<ExtensionConfig> extensions)
{
if (extensions == offeredExtensions)
return;
negotiatedExtensions = extensions == null ? null : new ArrayList<>(extensions);
extensionStack = null;
}
public List<ExtensionConfig> getNegotiatedExtensions()
{
return negotiatedExtensions;
}
public List<String> getOfferedSubprotocols()
{
return offeredSubprotocols;
} }
public Request getBaseRequest() public Request getBaseRequest()
@ -193,25 +72,113 @@ public class Negotiation
return response; return response;
} }
public void setSubprotocol(String subprotocol) public void negotiate() throws BadMessageException
{ {
this.subprotocol = subprotocol; try
response.setHeader(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), subprotocol); {
negotiateHeaders(getBaseRequest());
}
catch (Throwable x)
{
throw new BadMessageException("Invalid upgrade request", x);
}
} }
public String getSubprotocol() protected void negotiateHeaders(Request baseRequest)
{ {
return subprotocol; QuotedCSV extensions = null;
QuotedCSV protocols = null;
for (HttpField field : baseRequest.getHttpFields())
{
if (field.getHeader() != null)
{
switch (field.getHeader())
{
case SEC_WEBSOCKET_VERSION:
version = field.getValue();
break;
case SEC_WEBSOCKET_EXTENSIONS:
if (extensions == null)
extensions = new QuotedCSV(field.getValue());
else
extensions.addValue(field.getValue());
break;
case SEC_WEBSOCKET_SUBPROTOCOL:
if (protocols == null)
protocols = new QuotedCSV(field.getValue());
else
protocols.addValue(field.getValue());
break;
default:
break;
}
}
}
Set<String> available = components.getExtensionRegistry().getAvailableExtensionNames();
offeredExtensions = extensions == null
? Collections.emptyList()
: extensions.getValues().stream()
.map(ExtensionConfig::parse)
.filter(ec -> available.contains(ec.getName().toLowerCase()) && !ec.getName().startsWith("@"))
.collect(Collectors.toList());
offeredProtocols = protocols == null
? Collections.emptyList()
: protocols.getValues();
negotiatedExtensions = new ArrayList<>();
for (ExtensionConfig config : offeredExtensions)
{
long matches = negotiatedExtensions.stream()
.filter(negotiatedConfig -> negotiatedConfig.getName().equals(config.getName())).count();
if (matches == 0)
negotiatedExtensions.add(config);
}
} }
public abstract boolean isSuccessful();
public String getVersion() public String getVersion()
{ {
return version; return version;
} }
public boolean isUpgrade() public String getSubprotocol()
{ {
return upgrade; return protocol;
}
public void setSubprotocol(String protocol)
{
this.protocol = protocol;
response.setHeader(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL.asString(), protocol);
}
public List<String> getOfferedSubprotocols()
{
return offeredProtocols;
}
public List<ExtensionConfig> getOfferedExtensions()
{
return offeredExtensions;
}
public List<ExtensionConfig> getNegotiatedExtensions()
{
return negotiatedExtensions;
}
public void setNegotiatedExtensions(List<ExtensionConfig> extensions)
{
if (extensions == offeredExtensions)
return;
negotiatedExtensions = extensions;
extensionStack = null;
} }
public ExtensionStack getExtensionStack() public ExtensionStack getExtensionStack()
@ -229,14 +196,14 @@ public class Negotiation
else else
baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, null); baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, null);
} }
return extensionStack; return extensionStack;
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("Negotiation@%x{uri=%s,oe=%s,op=%s}", return String.format("%s@%x{uri=%s,oe=%s,op=%s}",
getClass().getSimpleName(),
hashCode(), hashCode(),
getRequest().getRequestURI(), getRequest().getRequestURI(),
getOfferedExtensions(), getOfferedExtensions(),

View File

@ -0,0 +1,211 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public abstract class AbstractHandshaker implements Handshaker
{
protected static final Logger LOG = Log.getLogger(RFC8441Handshaker.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
{
if (!validateRequest(request))
return false;
Negotiation negotiation = newNegotiation(request, response, new WebSocketComponents());
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
negotiation.negotiate();
if (!validateNegotiation(negotiation))
return false;
// Negotiate the FrameHandler
FrameHandler handler = negotiator.negotiate(negotiation);
if (handler == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided {}", request);
return false;
}
// Handle error responses
Request baseRequest = negotiation.getBaseRequest();
if (response.isCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: response committed {}", request);
baseRequest.setHandled(true);
return false;
}
int httpStatus = response.getStatus();
if (httpStatus > 200)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: invalid http code {} {}", httpStatus, request);
response.flushBuffer();
baseRequest.setHandled(true);
return false;
}
// Validate negotiated protocol
String protocol = negotiation.getSubprotocol();
List<String> offeredProtocols = negotiation.getOfferedSubprotocols();
if (protocol != null)
{
if (!offeredProtocols.contains(protocol))
throw new WebSocketException("not upgraded: selected a protocol not present in offered protocols");
}
else
{
if (!offeredProtocols.isEmpty())
throw new WebSocketException("not upgraded: no protocol selected from offered protocols");
}
// validate negotiated extensions
for (ExtensionConfig config : negotiation.getNegotiatedExtensions())
{
if (config.getName().startsWith("@"))
continue;
long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches < 1)
throw new WebSocketException("Upgrade failed: negotiated extension not requested");
matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches > 1)
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
Negotiated negotiated = new Negotiated(baseRequest.getHttpURI().toURI(), protocol, baseRequest.isSecure(), extensionStack, WebSocketConstants.SPEC_VERSION_STRING);
// Create the Session
WebSocketCoreSession coreSession = newWebSocketCoreSession(handler, negotiated);
if (defaultCustomizer != null)
defaultCustomizer.customize(coreSession);
negotiator.customize(coreSession);
if (LOG.isDebugEnabled())
LOG.debug("session {}", coreSession);
WebSocketConnection connection = createWebSocketConnection(baseRequest, coreSession);
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
throw new WebSocketException("not upgraded: no connection");
HttpChannel httpChannel = baseRequest.getHttpChannel();
HttpConfiguration httpConfig = httpChannel.getHttpConfiguration();
connection.setUseInputDirectByteBuffers(httpConfig.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(httpChannel.isUseOutputDirectByteBuffers());
httpChannel.getConnector().getEventListeners().forEach(connection::addEventListener);
coreSession.setWebSocketConnection(connection);
Response baseResponse = baseRequest.getResponse();
prepareResponse(baseResponse, negotiation);
if (httpConfig.getSendServerVersion())
baseResponse.getHttpFields().put(SERVER_VERSION);
baseResponse.flushBuffer();
baseRequest.setHandled(true);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={} framehandler={}", connection, coreSession, handler);
return true;
}
protected abstract boolean validateRequest(HttpServletRequest request);
protected abstract Negotiation newNegotiation(HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents);
protected boolean validateNegotiation(Negotiation negotiation)
{
if (!negotiation.isSuccessful())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no upgrade header or connection upgrade", negotiation.getBaseRequest());
return false;
}
if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), negotiation.getBaseRequest());
return false;
}
return true;
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.SERVER, negotiated);
}
protected abstract WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession);
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
protected abstract void prepareResponse(Response response, Negotiation negotiation);
}

View File

@ -18,54 +18,33 @@
package org.eclipse.jetty.websocket.core.server.internal; package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents; import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection; import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore; import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession; import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public final class RFC6455Handshaker implements Handshaker public final class RFC6455Handshaker extends AbstractHandshaker
{ {
static final Logger LOG = Log.getLogger(RFC6455Handshaker.class);
private static final HttpField UPGRADE_WEBSOCKET = new PreEncodedHttpField(HttpHeader.UPGRADE, "WebSocket"); private static final HttpField UPGRADE_WEBSOCKET = new PreEncodedHttpField(HttpHeader.UPGRADE, "WebSocket");
private static final HttpField CONNECTION_UPGRADE = new PreEncodedHttpField(HttpHeader.CONNECTION, HttpHeader.UPGRADE.asString()); private static final HttpField CONNECTION_UPGRADE = new PreEncodedHttpField(HttpHeader.CONNECTION, HttpHeader.UPGRADE.asString());
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, @Override
FrameHandler.Customizer defaultCustomizer) throws IOException protected boolean validateRequest(HttpServletRequest request)
{ {
if (!HttpMethod.GET.is(request.getMethod())) if (!HttpMethod.GET.is(request.getMethod()))
{ {
@ -74,176 +53,48 @@ public final class RFC6455Handshaker implements Handshaker
return false; return false;
} }
final Request baseRequest = Request.getBaseRequest(request); if (!HttpVersion.HTTP_1_1.is(request.getProtocol()))
if (!HttpVersion.HTTP_1_1.equals(baseRequest.getHttpVersion()))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("not upgraded version!=1.1 {}", baseRequest); LOG.debug("not upgraded version!=1.1 {}", request);
return false; return false;
} }
Negotiation negotiation = new Negotiation(baseRequest, request, response, new WebSocketComponents());
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
if (!negotiation.isUpgrade())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no upgrade header or connection upgrade", baseRequest);
return false;
}
if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), baseRequest);
return false;
}
if (negotiation.getKey() == null)
throw new BadMessageException("Missing request header 'Sec-WebSocket-Key'");
// Negotiate the FrameHandler
FrameHandler handler = negotiator.negotiate(negotiation);
if (LOG.isDebugEnabled())
LOG.debug("negotiated handler {}", handler);
// Handle error responses
if (response.isCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: response committed {}", baseRequest);
baseRequest.setHandled(true);
return false;
}
if (response.getStatus() > 200)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: error sent {} {}", response.getStatus(), baseRequest);
response.flushBuffer();
baseRequest.setHandled(true);
return false;
}
// Check for handler
if (handler == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided {}", baseRequest);
return false;
}
// validate negotiated subprotocol
String subprotocol = negotiation.getSubprotocol();
if (subprotocol != null)
{
if (!negotiation.getOfferedSubprotocols().contains(subprotocol))
throw new WebSocketException("not upgraded: selected a subprotocol not present in offered subprotocols");
}
else
{
if (!negotiation.getOfferedSubprotocols().isEmpty())
throw new WebSocketException("not upgraded: no subprotocol selected from offered subprotocols");
}
// validate negotiated extensions
for (ExtensionConfig config : negotiation.getNegotiatedExtensions())
{
if (config.getName().startsWith("@"))
continue;
long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches < 1)
throw new WebSocketException("Upgrade failed: negotiated extension not requested");
matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches > 1)
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
Negotiated negotiated = new Negotiated(
baseRequest.getHttpURI().toURI(),
subprotocol,
baseRequest.isSecure(),
extensionStack,
WebSocketConstants.SPEC_VERSION_STRING);
// Create the Session
WebSocketCoreSession coreSession = newWebSocketCoreSession(handler, negotiated);
if (defaultCustomizer != null)
defaultCustomizer.customize(coreSession);
negotiator.customize(coreSession);
if (LOG.isDebugEnabled())
LOG.debug("session {}", coreSession);
// Create a connection
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
// TODO: perhaps use of direct buffers should be WebSocket specific
// rather than inheriting the setting from HttpConfiguration.
HttpConfiguration httpConfig = httpChannel.getHttpConfiguration();
connection.setUseInputDirectByteBuffers(httpConfig.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(httpChannel.isUseOutputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
throw new WebSocketException("not upgraded: no connection");
connector.getEventListeners().forEach(connection::addEventListener);
coreSession.setWebSocketConnection(connection);
// send upgrade response
Response baseResponse = baseRequest.getResponse();
baseResponse.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
baseResponse.getHttpFields().put(UPGRADE_WEBSOCKET);
baseResponse.getHttpFields().put(CONNECTION_UPGRADE);
baseResponse.getHttpFields().put(HttpHeader.SEC_WEBSOCKET_ACCEPT, WebSocketCore.hashKey(negotiation.getKey()));
// See bugs.eclipse.org/485969
if (getSendServerVersion(connector))
{
baseResponse.getHttpFields().put(SERVER_VERSION);
}
baseResponse.flushBuffer();
baseRequest.setHandled(true);
// upgrade
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={}", connection, coreSession);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
return true; return true;
} }
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated) @Override
protected Negotiation newNegotiation(HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents)
{ {
return new WebSocketCoreSession(handler, Behavior.SERVER, negotiated); return new RFC6544Negotiation(Request.getBaseRequest(request), request, response, webSocketComponents);
} }
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession) @Override
protected boolean validateNegotiation(Negotiation negotiation)
{ {
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession); boolean result = super.validateNegotiation(negotiation);
} if (!result)
private boolean getSendServerVersion(Connector connector)
{
ConnectionFactory connFactory = connector.getConnectionFactory(HttpVersion.HTTP_1_1.asString());
if (connFactory == null)
return false; return false;
if (((RFC6544Negotiation)negotiation).getKey() == null)
throw new BadMessageException("Missing request header 'Sec-WebSocket-Key'");
return true;
}
if (connFactory instanceof HttpConnectionFactory) @Override
{ protected WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession)
HttpConfiguration httpConf = ((HttpConnectionFactory)connFactory).getHttpConfiguration(); {
if (httpConf != null) HttpChannel httpChannel = baseRequest.getHttpChannel();
return httpConf.getSendServerVersion(); Connector connector = httpChannel.getConnector();
} return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
return false; }
@Override
protected void prepareResponse(Response response, Negotiation negotiation)
{
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
HttpFields responseFields = response.getHttpFields();
responseFields.put(UPGRADE_WEBSOCKET);
responseFields.put(CONNECTION_UPGRADE);
responseFields.put(HttpHeader.SEC_WEBSOCKET_ACCEPT, WebSocketCore.hashKey(((RFC6544Negotiation)negotiation).getKey()));
} }
} }

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.server.internal;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.QuotedCSV;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.server.Negotiation;
public class RFC6544Negotiation extends Negotiation
{
private boolean successful;
private String key;
public RFC6544Negotiation(Request baseRequest, HttpServletRequest request, HttpServletResponse response, WebSocketComponents components) throws BadMessageException
{
super(baseRequest, request, response, components);
}
@Override
protected void negotiateHeaders(Request baseRequest)
{
super.negotiateHeaders(baseRequest);
boolean upgrade = false;
QuotedCSV connectionCSVs = null;
for (HttpField field : baseRequest.getHttpFields())
{
HttpHeader header = field.getHeader();
if (header != null)
{
switch (header)
{
case UPGRADE:
upgrade = "websocket".equalsIgnoreCase(field.getValue());
break;
case CONNECTION:
if (connectionCSVs == null)
connectionCSVs = new QuotedCSV();
connectionCSVs.addValue(field.getValue());
break;
case SEC_WEBSOCKET_KEY:
key = field.getValue();
break;
default:
break;
}
}
}
successful = upgrade && connectionCSVs != null &&
connectionCSVs.getValues().stream().anyMatch(s -> s.equalsIgnoreCase("upgrade"));
}
@Override
public boolean isSuccessful()
{
return successful;
}
public String getKey()
{
return key;
}
}

View File

@ -18,220 +18,62 @@
package org.eclipse.jetty.websocket.core.server.internal; package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents; import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection; import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession; import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
public class RFC8441Handshaker implements Handshaker public class RFC8441Handshaker extends AbstractHandshaker
{ {
static final Logger LOG = Log.getLogger(RFC8441Handshaker.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@Override @Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException protected boolean validateRequest(HttpServletRequest request)
{ {
if (!HttpMethod.CONNECT.is(request.getMethod())) if (!HttpMethod.CONNECT.is(request.getMethod()))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("not upgraded method!=GET {}", request.toString()); LOG.debug("not upgraded method!=GET {}", request);
return false; return false;
} }
Request baseRequest = Request.getBaseRequest(request); if (!HttpVersion.HTTP_2.is(request.getProtocol()))
if (!HttpVersion.HTTP_2.equals(baseRequest.getHttpVersion()))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("not upgraded HttpVersion!=2 {}", baseRequest); LOG.debug("not upgraded HttpVersion!=2 {}", request);
return false; return false;
} }
Negotiation negotiation = new RFC8441Negotiation(baseRequest, request, response, new WebSocketComponents());
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
if (!negotiation.isUpgrade())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no upgrade header or connection upgrade", baseRequest);
return false;
}
if (!WebSocketConstants.SPEC_VERSION_STRING.equals(negotiation.getVersion()))
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: unsupported version {} {}", negotiation.getVersion(), baseRequest);
return false;
}
// Negotiate the FrameHandler
FrameHandler handler = negotiator.negotiate(negotiation);
if (LOG.isDebugEnabled())
LOG.debug("negotiated handler {}", handler);
// Handle error responses
if (response.isCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: response committed {}", baseRequest);
baseRequest.setHandled(true);
return false;
}
if (response.getStatus() > 200)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: error sent {} {}", response.getStatus(), baseRequest);
response.flushBuffer();
baseRequest.setHandled(true);
return false;
}
// Check for handler
if (handler == null)
{
if (LOG.isDebugEnabled())
LOG.debug("not upgraded: no frame handler provided {}", baseRequest);
return false;
}
// validate negotiated subprotocol
String subprotocol = negotiation.getSubprotocol();
if (subprotocol != null)
{
if (!negotiation.getOfferedSubprotocols().contains(subprotocol))
throw new WebSocketException("not upgraded: selected a subprotocol not present in offered subprotocols");
}
else
{
if (!negotiation.getOfferedSubprotocols().isEmpty())
throw new WebSocketException("not upgraded: no subprotocol selected from offered subprotocols");
}
// validate negotiated extensions
for (ExtensionConfig config : negotiation.getNegotiatedExtensions())
{
if (config.getName().startsWith("@"))
continue;
long matches = negotiation.getOfferedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches < 1)
throw new WebSocketException("Upgrade failed: negotiated extension not requested");
matches = negotiation.getNegotiatedExtensions().stream().filter(c -> config.getName().equalsIgnoreCase(c.getName())).count();
if (matches > 1)
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
Negotiated negotiated = new Negotiated(
baseRequest.getHttpURI().toURI(),
subprotocol,
baseRequest.isSecure(),
extensionStack,
WebSocketConstants.SPEC_VERSION_STRING);
// Create the Channel
WebSocketCoreSession coreSession = newWebSocketCoreSession(handler, negotiated);
if (defaultCustomizer != null)
defaultCustomizer.customize(coreSession);
negotiator.customize(coreSession);
if (LOG.isDebugEnabled())
LOG.debug("coreSession {}", coreSession);
// Create the Connection
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
EndPoint endPoint = httpChannel.getTunnellingEndPoint();
WebSocketConnection connection = newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
throw new WebSocketException("not upgraded: no connection");
for (Connection.Listener listener : connector.getBeans(Connection.Listener.class))
{
connection.addEventListener(listener);
}
coreSession.setWebSocketConnection(connection);
// send upgrade response
Response baseResponse = baseRequest.getResponse();
baseResponse.setStatus(HttpStatus.OK_200);
// See bugs.eclipse.org/485969
if (getSendServerVersion(connector))
baseResponse.getHttpFields().put(SERVER_VERSION);
baseRequest.setHandled(true);
// upgrade
if (LOG.isDebugEnabled())
LOG.debug("upgrade connection={} session={}", connection, coreSession);
baseRequest.setAttribute(HttpTransport.UPGRADE_CONNECTION_ATTRIBUTE, connection);
return true; return true;
} }
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated) @Override
protected Negotiation newNegotiation(HttpServletRequest request, HttpServletResponse response, WebSocketComponents webSocketComponents)
{ {
return new WebSocketCoreSession(handler, Behavior.SERVER, negotiated); return new RFC8441Negotiation(Request.getBaseRequest(request), request, response, webSocketComponents);
} }
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession) @Override
protected WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession)
{ {
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession); HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
EndPoint endPoint = httpChannel.getTunnellingEndPoint();
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
} }
private boolean getSendServerVersion(Connector connector) @Override
protected void prepareResponse(Response response, Negotiation negotiation)
{ {
ConnectionFactory connFactory = connector.getConnectionFactory(HttpVersion.HTTP_2.asString()); response.setStatus(HttpStatus.OK_200);
if (connFactory == null)
return false;
if (connFactory instanceof HttpConnectionFactory)
{
HttpConfiguration httpConf = ((HttpConnectionFactory)connFactory).getHttpConfiguration();
if (httpConf != null)
return httpConf.getSendServerVersion();
}
return false;
} }
} }

View File

@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.core.WebSocketComponents; import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.Negotiation;
@ -34,11 +35,11 @@ public class RFC8441Negotiation extends Negotiation
} }
@Override @Override
public boolean isUpgrade() public boolean isSuccessful()
{ {
if (!getBaseRequest().hasMetaData()) MetaData.Request metaData = getBaseRequest().getMetaData();
if (metaData == null)
return false; return false;
return "websocket".equals(metaData.getProtocol());
return "websocket".equals(getBaseRequest().getMetaData().getProtocol());
} }
} }

View File

@ -97,7 +97,7 @@ public class WebSocketNegotiationTest extends WebSocketTester
break; break;
case "testNotAcceptingExtensions": case "testNotAcceptingExtensions":
negotiation.setNegotiatedExtensions(Collections.EMPTY_LIST); negotiation.setNegotiatedExtensions(Collections.emptyList());
break; break;
case "testNoSubProtocolSelected": case "testNoSubProtocolSelected":
@ -353,4 +353,4 @@ public class WebSocketNegotiationTest extends WebSocketTester
assertThat(response, containsString("400 Bad Request")); assertThat(response, containsString("400 Bad Request"));
} }
} }

View File

@ -26,15 +26,12 @@ import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
public class WebSocketServer public class WebSocketServer
{ {
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server; private final Server server;
private URI serverUri; private URI serverUri;
@ -59,12 +56,12 @@ public class WebSocketServer
return server; return server;
} }
public WebSocketServer(FrameHandler frameHandler) throws Exception public WebSocketServer(FrameHandler frameHandler)
{ {
this(new DefaultNegotiator(frameHandler)); this(new DefaultNegotiator(frameHandler));
} }
public WebSocketServer(WebSocketNegotiator negotiator) throws Exception public WebSocketServer(WebSocketNegotiator negotiator)
{ {
server = new Server(); server = new Server();
ServerConnector connector = new ServerConnector(server); ServerConnector connector = new ServerConnector(server);

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -34,8 +33,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageHandler; import org.eclipse.jetty.websocket.core.MessageHandler;
@ -47,8 +44,6 @@ import static org.eclipse.jetty.util.Callback.NOOP;
public class ChatWebSocketServer public class ChatWebSocketServer
{ {
private static Logger LOG = Log.getLogger(ChatWebSocketServer.class);
private Set<MessageHandler> members = new HashSet<>(); private Set<MessageHandler> members = new HashSet<>();
private FrameHandler negotiate(Negotiation negotiation) private FrameHandler negotiate(Negotiation negotiation)
@ -77,7 +72,7 @@ public class ChatWebSocketServer
{ {
members.add(this); members.add(this);
callback.succeeded(); callback.succeeded();
}, x -> callback.failed(x))); }, callback::failed));
} }
@Override @Override