Cleaning up websocket-server module

This commit is contained in:
Joakim Erdfelt 2012-06-26 12:15:21 -07:00
parent 604f700a82
commit e1488f8c92
6 changed files with 120 additions and 226 deletions

View File

@ -20,6 +20,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.CloseFrame;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.server.callbacks.WebSocketCloseCallback;
// TODO: implement WebSocket.Connection (for API access)?
public class AsyncWebSocketConnection extends AbstractAsyncConnection

View File

@ -15,12 +15,14 @@ package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -33,69 +35,75 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.api.ExtensionConfig;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension;
import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension;
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
/**
* Factory to create WebSocket connections
*/
public class WebSocketServerFactory extends AbstractLifeCycle
{
public interface Acceptor
{
/* ------------------------------------------------------------ */
/**
* <p>Checks the origin of an incoming WebSocket handshake request.</p>
* @param request the incoming HTTP upgrade request
* @param origin the origin URI
* @return boolean to indicate that the origin is acceptable.
*/
boolean checkOrigin(HttpServletRequest request, String origin);
/* ------------------------------------------------------------ */
/**
* <p>Factory method that applications needs to implement to return a
* {@link WebSocket} object.</p>
* @param request the incoming HTTP upgrade request
* @param protocol the websocket sub protocol
* @return a new {@link WebSocket} object that will handle websocket events.
*/
WebSocket doWebSocketConnect(HttpServletRequest request, String protocol);
}
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private final Queue<AsyncWebSocketConnection> connections = new ConcurrentLinkedQueue<AsyncWebSocketConnection>();
private final Queue<WebSocketServletConnection> connections = new ConcurrentLinkedQueue<WebSocketServletConnection>();
private final Map<String,Class<? extends Extension>> _extensionClasses = new HashMap<String, Class<? extends Extension>>();
// TODO: replace with ExtensionRegistry in websocket-core
private final Map<String, Class<? extends Extension>> extensionClasses = new HashMap<>();
{
_extensionClasses.put("identity",IdentityExtension.class);
_extensionClasses.put("fragment",FragmentExtension.class);
_extensionClasses.put("x-deflate-frame",DeflateFrameExtension.class);
extensionClasses.put("identity",IdentityExtension.class);
extensionClasses.put("fragment",FragmentExtension.class);
extensionClasses.put("x-deflate-frame",DeflateFrameExtension.class);
}
private final Acceptor _acceptor;
private final Map<Integer, WebSocketServer.Handshake> handshakes = new HashMap<>();
{
handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
handshakes.put(HandshakeHixie76.VERSION,new HandshakeHixie76());
}
private final WebSocketServer.Acceptor acceptor;
private final String supportedVersions;
private WebSocketPolicy policy;
public WebSocketServerFactory(Acceptor acceptor, WebSocketPolicy policy)
public WebSocketServerFactory(WebSocketServer.Acceptor acceptor, WebSocketPolicy policy)
{
this._acceptor = acceptor;
this.acceptor = acceptor;
this.policy = policy;
// Create supportedVersions
List<Integer> versions = new ArrayList<>();
for (int v : handshakes.keySet())
{
versions.add(v);
}
Collections.sort(versions,Collections.reverseOrder()); // newest first
StringBuilder rv = new StringBuilder();
for (int v : versions)
{
if (rv.length() > 0)
{
rv.append(", ");
}
rv.append(v);
}
supportedVersions = rv.toString();
}
public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response)
throws IOException
public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
{
if ("websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
{
String origin = request.getHeader("Origin");
if (origin==null)
if (origin == null)
{
origin = request.getHeader("Sec-WebSocket-Origin");
}
if (!_acceptor.checkOrigin(request,origin))
if (!acceptor.checkOrigin(request,origin))
{
response.sendError(HttpServletResponse.SC_FORBIDDEN);
return false;
@ -104,15 +112,14 @@ public class WebSocketServerFactory extends AbstractLifeCycle
// Try each requested protocol
WebSocket websocket = null;
@SuppressWarnings("unchecked")
Enumeration<String> protocols = request.getHeaders("Sec-WebSocket-Protocol");
String protocol=null;
while ((protocol==null) && (protocols!=null) && protocols.hasMoreElements())
String protocol = null;
while ((protocol == null) && (protocols != null) && protocols.hasMoreElements())
{
String candidate = protocols.nextElement();
for (String p : parseProtocols(candidate))
{
websocket = _acceptor.doWebSocketConnect(request, p);
websocket = acceptor.doWebSocketConnect(request,p);
if (websocket != null)
{
protocol = p;
@ -125,9 +132,9 @@ public class WebSocketServerFactory extends AbstractLifeCycle
if (websocket == null)
{
// Try with no protocol
websocket = _acceptor.doWebSocketConnect(request, null);
websocket = acceptor.doWebSocketConnect(request,null);
if (websocket==null)
if (websocket == null)
{
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return false;
@ -135,38 +142,38 @@ public class WebSocketServerFactory extends AbstractLifeCycle
}
// Send the upgrade
upgrade(request, response, websocket, protocol);
upgrade(request,response,websocket,protocol);
return true;
}
return false;
}
protected boolean addConnection(WebSocketServletConnection connection)
protected boolean addConnection(AsyncWebSocketConnection connection)
{
return isRunning() && connections.add(connection);
}
protected void closeConnections()
{
for (WebSocketServletConnection connection : connections)
for (AsyncWebSocketConnection connection : connections)
{
// TODO connection.shutdown();
connection.getEndPoint().close();
}
}
@Override
protected void doStop() throws Exception
{
closeConnections();
closeConnections();
}
/**
* @return A modifiable map of extension name to extension class
*/
public Map<String,Class<? extends Extension>> getExtensionClassesMap()
public Map<String, Class<? extends Extension>> getExtensionClassesMap()
{
return _extensionClasses;
return extensionClasses;
}
/**
@ -179,34 +186,22 @@ public class WebSocketServerFactory extends AbstractLifeCycle
return policy;
}
public List<Extension> initExtensions(List<String> requested,int maxDataOpcodes,int maxControlOpcodes,int maxReservedBits)
public List<Extension> initExtensions(List<ExtensionConfig> requested)
{
List<Extension> extensions = new ArrayList<Extension>();
for (String rExt : requested)
for (ExtensionConfig cfg : requested)
{
QuotedStringTokenizer tok = new QuotedStringTokenizer(rExt,";");
String extName=tok.nextToken().trim();
Map<String,String> parameters = new HashMap<String,String>();
while (tok.hasMoreTokens())
{
QuotedStringTokenizer nv = new QuotedStringTokenizer(tok.nextToken().trim(),"=");
String name=nv.nextToken().trim();
String value=nv.hasMoreTokens()?nv.nextToken().trim():null;
parameters.put(name,value);
}
Extension extension = newExtension(cfg.getName());
Extension extension = newExtension(extName);
if (extension==null)
if (extension == null)
{
continue;
}
if (extension.init(parameters))
{
LOG.debug("add {} {}",extName,parameters);
extensions.add(extension);
}
extension.setConfig(cfg);
LOG.debug("added {}",extension);
extensions.add(extension);
}
LOG.debug("extensions={}",extensions);
return extensions;
@ -216,8 +211,8 @@ public class WebSocketServerFactory extends AbstractLifeCycle
{
try
{
Class<? extends Extension> extClass = _extensionClasses.get(name);
if (extClass!=null)
Class<? extends Extension> extClass = extensionClasses.get(name);
if (extClass != null)
{
return extClass.newInstance();
}
@ -234,107 +229,101 @@ public class WebSocketServerFactory extends AbstractLifeCycle
{
if (protocol == null)
{
return new String[]{null};
return new String[]
{ null };
}
protocol = protocol.trim();
if ((protocol == null) || (protocol.length() == 0))
{
return new String[]{null};
return new String[]
{ null };
}
String[] passed = protocol.split("\\s*,\\s*");
String[] protocols = new String[passed.length + 1];
System.arraycopy(passed, 0, protocols, 0, passed.length);
System.arraycopy(passed,0,protocols,0,passed.length);
return protocols;
}
protected boolean removeConnection(WebSocketServletConnection connection)
protected boolean removeConnection(AsyncWebSocketConnection connection)
{
return connections.remove(connection);
}
/**
* Upgrade the request/response to a WebSocket Connection.
* <p>This method will not normally return, but will instead throw a
* UpgradeConnectionException, to exit HTTP handling and initiate
* WebSocket handling of the connection.
*
* @param request The request to upgrade
* @param response The response to upgrade
* @param websocket The websocket handler implementation to use
* @param protocol The websocket protocol
* @throws IOException in case of I/O errors
* <p>
* This method will not normally return, but will instead throw a UpgradeConnectionException, to exit HTTP handling and initiate WebSocket handling of the
* connection.
*
* @param request
* The request to upgrade
* @param response
* The response to upgrade
* @param websocket
* The websocket handler implementation to use
* @param acceptedSubProtocol
* The accepted websocket sub protocol
* @throws IOException
* in case of I/O errors
*/
public void upgrade(HttpServletRequest request, HttpServletResponse response, WebSocket websocket, String protocol)
throws IOException
public void upgrade(HttpServletRequest request, HttpServletResponse response, WebSocket websocket, String acceptedSubProtocol) throws IOException
{
if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
{
throw new IllegalStateException("!Upgrade:websocket");
throw new IllegalStateException("Not a 'WebSocket: Ugprade' request");
}
if (!"HTTP/1.1".equals(request.getProtocol()))
{
throw new IllegalStateException("!HTTP/1.1");
throw new IllegalStateException("Not a 'HTTP/1.1' request");
}
int draft = request.getIntHeader("Sec-WebSocket-Version");
if (draft < 0) {
int version = request.getIntHeader("Sec-WebSocket-Version");
if (version < 0)
{
// Old pre-RFC version specifications (header not present in RFC-6455)
draft = request.getIntHeader("Sec-WebSocket-Draft");
version = request.getIntHeader("Sec-WebSocket-Draft");
}
HttpConnection http = HttpConnection.getCurrentConnection();
AsyncEndPoint endp = http.getEndPoint();
List<String> extensions_requested = new ArrayList<String>();
@SuppressWarnings("unchecked")
List<ExtensionConfig> extensionsRequested = new ArrayList<>();
Enumeration<String> e = request.getHeaders("Sec-WebSocket-Extensions");
while (e.hasMoreElements())
{
QuotedStringTokenizer tok = new QuotedStringTokenizer(e.nextElement(),",");
while (tok.hasMoreTokens())
{
extensions_requested.add(tok.nextToken());
extensionsRequested.add(ExtensionConfig.parse(tok.nextToken()));
}
}
final WebSocketServletConnection connection;
switch (draft)
WebSocketServer.Handshake handshaker = handshakes.get(version);
if (handshaker == null)
{
case org.eclipse.jetty.websocket.api.WebSocket.VERSION: // RFC 6455 Version
{
// List<Extension> extensions = initExtensions(extensions_requested,
// 8 - WebSocketConnectionRFC6455.OP_EXT_DATA,
// 16 - WebSocketConnectionRFC6455.OP_EXT_CTRL,
// 3);
// connection = new WebSocketServletConnectionRFC6455(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft);
break;
}
default:
{
LOG.warn("Unsupported Websocket version: " + draft);
// Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
// Using the examples as outlined
response.setHeader("Sec-WebSocket-Version", "" + org.eclipse.jetty.websocket.api.WebSocket.VERSION /*+ ", 0"*/);
response.setStatus(HttpStatus.BAD_REQUEST_400);
return;
}
LOG.warn("Unsupported Websocket version: " + version);
// Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
// Using the examples as outlined
response.setHeader("Sec-WebSocket-Version",supportedVersions);
response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
return;
}
// addConnection(connection);
// Create connection
HttpConnection http = HttpConnection.getCurrentConnection();
AsyncEndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().findExecutor();
final AsyncWebSocketConnection connection = new AsyncWebSocketConnection(endp,executor,policy);
endp.setAsyncConnection(connection);
// Set the defaults
// connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
// connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
// Initialize / Negotiate Extensions
List<Extension> extensions = initExtensions(extensionsRequested);
// Let the connection finish processing the handshake
// connection.handshake(request, response, protocol);
response.flushBuffer();
// Process (version specific) handshake response
handshaker.doHandshakeResponse(request,response,extensions,acceptedSubProtocol);
// Give the connection any unused data from the HTTP connection.
// connection.fillBuffersFrom(((HttpParser)http.getParser()).getHeaderBuffer());
// connection.fillBuffersFrom(((HttpParser)http.getParser()).getBodyBuffer());
// Add connection
addConnection(connection);
// Tell jetty about the new connection
// LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),draft,protocol,connection);
// request.setAttribute("org.eclipse.jetty.io.Connection", connection);
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,acceptedSubProtocol,connection);
request.setAttribute("org.eclipse.jetty.io.Connection",connection); // TODO: this still needed?
}
}

View File

@ -1,27 +0,0 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public interface WebSocketServletConnection /* extends WebSocketConnection */
{
void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException;
}

View File

@ -1,70 +0,0 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class WebSocketServletConnectionRFC6455 /* extends WebSocketConnectionRFC6455 implements WebSocketServletConnection */
{
private /* final */ WebSocketServerFactory factory;
/*
public WebSocketServletConnectionRFC6455(WebSocketServerFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
List<Extension> extensions, int draft) throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft);
this.factory = factory;
}
*/
/* ------------------------------------------------------------ */
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
{
String key = request.getHeader("Sec-WebSocket-Key");
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
// response.addHeader("Sec-WebSocket-Accept",hashKey(key));
if (subprotocol != null)
{
response.addHeader("Sec-WebSocket-Protocol",subprotocol);
}
/*
for (Extension ext : getExtensions())
{
response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
}
*/
response.sendError(101);
// onFrameHandshake();
// onWebSocketOpen();
}
/*
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
*/
}

View File

@ -1,8 +1,9 @@
package org.eclipse.jetty.websocket.server;
package org.eclipse.jetty.websocket.server.callbacks;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.server.AsyncWebSocketConnection;
public class WebSocketCloseCallback implements Callback<Void>
{

View File

@ -1,4 +1,4 @@
package org.eclipse.jetty.websocket.server;
package org.eclipse.jetty.websocket.server.callbacks;
import org.eclipse.jetty.util.FutureCallback;