From 970d0764f7e7fb49eeb57c9e120a1de50d522fe2 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Mon, 25 Jun 2012 13:48:05 -0700 Subject: [PATCH] Working towards a happy websocket-server connection + handshake --- .../server/AsyncWebSocketConnection.java | 136 +++++++++- .../server/WebSocketCloseCallback.java | 25 ++ .../websocket/server/WebSocketHandler.java | 2 +- .../websocket/server/WebSocketHixie76.java | 9 - .../server/WebSocketOpenCallback.java | 18 ++ .../websocket/server/WebSocketServer.java | 60 +++++ .../server/WebSocketServerFactory.java | 245 +++++++++--------- .../websocket/server/WebSocketServlet.java | 81 ++++-- .../server/handshake/HandshakeHixie76.java | 29 +++ .../server/handshake/HandshakeRFC6455.java | 50 ++++ .../server/WebSocketServletRFCTest.java | 107 ++++++-- .../server/examples/MyEchoServlet.java | 19 ++ .../server/examples/MyEchoSocket.java | 36 +++ .../server/helper/FrameParseCapture.java | 94 +++++++ 14 files changed, 723 insertions(+), 188 deletions(-) create mode 100644 jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketCloseCallback.java delete mode 100644 jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHixie76.java create mode 100644 jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketOpenCallback.java create mode 100644 jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServer.java create mode 100644 jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeHixie76.java create mode 100644 jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeRFC6455.java create mode 100644 jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoServlet.java create mode 100644 jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java create mode 100644 jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/FrameParseCapture.java diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/AsyncWebSocketConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/AsyncWebSocketConnection.java index 232fc0f56ca..fa9b77fbcd5 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/AsyncWebSocketConnection.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/AsyncWebSocketConnection.java @@ -1,29 +1,151 @@ package org.eclipse.jetty.websocket.server; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Executor; import org.eclipse.jetty.io.AbstractAsyncConnection; import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.io.StandardByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.ExtensionConfig; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.frames.CloseFrame; +import org.eclipse.jetty.websocket.generator.Generator; +import org.eclipse.jetty.websocket.parser.Parser; +// TODO: implement WebSocket.Connection (for API access)? public class AsyncWebSocketConnection extends AbstractAsyncConnection { - // TODO: track extensions? (only those that need to operate at this level?) - // TODO: track generic WebSocket.Connection (for API access)? + private static final Logger LOG = Log.getLogger(AsyncWebSocketConnection.class); + private static final ThreadLocal CURRENT_CONNECTION = new ThreadLocal(); - public AsyncWebSocketConnection(AsyncEndPoint endp, Executor executor) + public static AsyncWebSocketConnection getCurrentConnection() { - super(endp,executor); + return CURRENT_CONNECTION.get(); } - public AsyncWebSocketConnection(AsyncEndPoint endp, Executor executor, boolean executeOnlyFailure) + protected static void setCurrentConnection(AsyncWebSocketConnection connection) { - super(endp,executor,executeOnlyFailure); + CURRENT_CONNECTION.set(connection); + } + + private final ByteBufferPool bufferPool; + private Generator generator; + private Parser parser; + private WebSocketPolicy policy; + // TODO: track extensions? (only those that need to operate at this level?) + // TODO: are extensions going to layer the endpoint? + // TODO: are extensions going to layer the connection? + private List extensions; + + public AsyncWebSocketConnection(AsyncEndPoint endp, Executor executor, WebSocketPolicy policy) + { + super(endp,executor); + this.policy = policy; + this.bufferPool = new StandardByteBufferPool(policy.getBufferSize()); + this.generator = new Generator(bufferPool,policy); + this.parser = new Parser(policy); + this.extensions = new ArrayList<>(); + } + + private int fill(AsyncEndPoint endPoint, ByteBuffer buffer) + { + try + { + return endPoint.fill(buffer); + } + catch (IOException e) + { + terminateConnection(StatusCode.PROTOCOL,e.getMessage()); + throw new RuntimeIOException(e); + } + } + + /** + * Get the list of extensions in use. + *

+ * This list is negotiated during the WebSocket Upgrade Request/Response handshake. + * + * @return the list of negotiated extensions in use. + */ + public List getExtensions() + { + return extensions; } @Override public void onFillable() { - // TODO Auto-generated method stub + LOG.debug("onFillable"); + setCurrentConnection(this); + ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clear(buffer); + try + { + read(buffer); + } + finally + { + setCurrentConnection(null); + bufferPool.release(buffer); + } + } + private void read(ByteBuffer buffer) + { + while (true) + { + int filled = fill(getEndPoint(),buffer); + if (filled == 0) + { + break; + } + if (filled < 0) + { + // IO error + terminateConnection(StatusCode.PROTOCOL,null); + break; + } + parser.parse(buffer); + } + } + + /** + * Get the list of extensions in use. + *

+ * This list is negotiated during the WebSocket Upgrade Request/Response handshake. + * + * @param extensions + * the list of negotiated extensions in use. + */ + public void setExtensions(List extensions) + { + this.extensions = extensions; + } + + /** + * For terminating connections forcefully. + * + * @param statusCode + * the WebSocket status code. + * @param reason + * the (optiona) reason string. (null is allowed) + * @see StatusCode + */ + private void terminateConnection(short statusCode, String reason) + { + CloseFrame close = new CloseFrame(statusCode); + close.setReason(reason); + + // fire and forget -> close frame + getEndPoint().write(null,new WebSocketCloseCallback(this),generator.generate(close)); } } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketCloseCallback.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketCloseCallback.java new file mode 100644 index 00000000000..bd5599993f1 --- /dev/null +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketCloseCallback.java @@ -0,0 +1,25 @@ +package org.eclipse.jetty.websocket.server; + +import org.eclipse.jetty.util.Callback; + +public class WebSocketCloseCallback implements Callback +{ + private AsyncWebSocketConnection conn; + + public WebSocketCloseCallback(AsyncWebSocketConnection conn) + { + this.conn = conn; + } + + @Override + public void completed(Void context) + { + this.conn.getEndPoint().close(); + } + + @Override + public void failed(Void context, Throwable cause) + { + this.conn.getEndPoint().close(); + } +} diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandler.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandler.java index 1ede4f46c60..18e803d9c34 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandler.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandler.java @@ -39,7 +39,7 @@ import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -public abstract class WebSocketHandler extends HandlerWrapper implements WebSocketServerFactory.Acceptor +public abstract class WebSocketHandler extends HandlerWrapper implements WebSocketServer.Acceptor { private final WebSocketServerFactory webSocketFactory; diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHixie76.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHixie76.java deleted file mode 100644 index 529f6be839d..00000000000 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHixie76.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.eclipse.jetty.websocket.server; - -/** - * Hixie-76 Draft for WebSocket protocol - * Seen in use by Safari/OSX - */ -public class WebSocketHixie76 { - /* Put Hixie-76 specifics in here */ -} diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketOpenCallback.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketOpenCallback.java new file mode 100644 index 00000000000..9a2cbb00979 --- /dev/null +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketOpenCallback.java @@ -0,0 +1,18 @@ +package org.eclipse.jetty.websocket.server; + +import org.eclipse.jetty.util.FutureCallback; + +public class WebSocketOpenCallback extends FutureCallback +{ + @Override + public void completed(String context) + { + // TODO notify API on connection open + } + + @Override + public void failed(String context, Throwable x) + { + // TODO notify API on open failure + } +} diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServer.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServer.java new file mode 100644 index 00000000000..3e1520db267 --- /dev/null +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServer.java @@ -0,0 +1,60 @@ +package org.eclipse.jetty.websocket.server; + +import java.io.IOException; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.extensions.Extension; + +/** + * Main API class for WebSocket servers + */ +public interface WebSocketServer +{ + public static interface Acceptor + { + /** + *

+ * Checks the origin of an incoming WebSocket handshake request. + *

+ * + * @param request + * the incoming HTTP upgrade request + * @param origin + * the origin URI + * @return boolean to indicate that the origin is acceptable. + */ + boolean checkOrigin(HttpServletRequest request, String origin); + + /* ------------------------------------------------------------ */ + /** + *

+ * Factory method that applications needs to implement to return a {@link WebSocket} object. + *

+ * + * @param request + * the incoming HTTP upgrade request + * @param protocol + * the websocket sub protocol + * @return a new {@link WebSocket} object that will handle websocket events. + */ + WebSocket doWebSocketConnect(HttpServletRequest request, String protocol); + } + + public static interface Handshake + { + /** + * Formulate a WebSocket upgrade handshake response. + * + * @param request + * @param response + * @param extensions + * @param acceptedSubProtocol + */ + public void doHandshakeResponse(HttpServletRequest request, HttpServletResponse response, List extensions, String acceptedSubProtocol) + throws IOException; + } +} diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index cf75a85dd1f..3fa5d222373 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -15,87 +15,101 @@ package org.eclipse.jetty.websocket.server; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.util.QuotedStringTokenizer; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.api.ExtensionConfig; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.extensions.Extension; import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension; import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension; import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension; +import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76; +import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455; /** * Factory to create WebSocket connections */ public class WebSocketServerFactory extends AbstractLifeCycle { - public interface Acceptor - { - /* ------------------------------------------------------------ */ - /** - *

Checks the origin of an incoming WebSocket handshake request.

- * @param request the incoming HTTP upgrade request - * @param origin the origin URI - * @return boolean to indicate that the origin is acceptable. - */ - boolean checkOrigin(HttpServletRequest request, String origin); - - /* ------------------------------------------------------------ */ - /** - *

Factory method that applications needs to implement to return a - * {@link WebSocket} object.

- * @param request the incoming HTTP upgrade request - * @param protocol the websocket sub protocol - * @return a new {@link WebSocket} object that will handle websocket events. - */ - WebSocket doWebSocketConnect(HttpServletRequest request, String protocol); - } private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class); + private static final int RESPONSE_BUFFER_SIZE = 8192; - private final Queue connections = new ConcurrentLinkedQueue(); + private final Queue connections = new ConcurrentLinkedQueue(); - private final Map> _extensionClasses = new HashMap>(); + // TODO: replace with ExtensionRegistry in websocket-core + private final Map> extensionClasses = new HashMap<>(); { - _extensionClasses.put("identity",IdentityExtension.class); - _extensionClasses.put("fragment",FragmentExtension.class); - _extensionClasses.put("x-deflate-frame",DeflateFrameExtension.class); + extensionClasses.put("identity",IdentityExtension.class); + extensionClasses.put("fragment",FragmentExtension.class); + extensionClasses.put("x-deflate-frame",DeflateFrameExtension.class); } - private final Acceptor _acceptor; + private final Map handshakes = new HashMap<>(); + { + handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455()); + handshakes.put(HandshakeHixie76.VERSION,new HandshakeHixie76()); + } + + private final WebSocketServer.Acceptor acceptor; + private final ByteBufferPool bufferPool; + private final String supportedVersions; private WebSocketPolicy policy; - public WebSocketServerFactory(Acceptor acceptor, WebSocketPolicy policy) + public WebSocketServerFactory(WebSocketServer.Acceptor acceptor, WebSocketPolicy policy) { - this._acceptor = acceptor; + this.acceptor = acceptor; this.policy = policy; + this.bufferPool = new StandardByteBufferPool(RESPONSE_BUFFER_SIZE); + + // Create supportedVersions + List versions = new ArrayList<>(); + for (int v : handshakes.keySet()) + { + versions.add(v); + } + Collections.sort(versions,Collections.reverseOrder()); // newest first + StringBuilder rv = new StringBuilder(); + for (int v : versions) + { + if (rv.length() > 0) + { + rv.append(", "); + } + rv.append(v); + } + supportedVersions = rv.toString(); } - public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) - throws IOException + public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException { if ("websocket".equalsIgnoreCase(request.getHeader("Upgrade"))) { String origin = request.getHeader("Origin"); - if (origin==null) + if (origin == null) { origin = request.getHeader("Sec-WebSocket-Origin"); } - if (!_acceptor.checkOrigin(request,origin)) + if (!acceptor.checkOrigin(request,origin)) { response.sendError(HttpServletResponse.SC_FORBIDDEN); return false; @@ -104,15 +118,14 @@ public class WebSocketServerFactory extends AbstractLifeCycle // Try each requested protocol WebSocket websocket = null; - @SuppressWarnings("unchecked") Enumeration protocols = request.getHeaders("Sec-WebSocket-Protocol"); - String protocol=null; - while ((protocol==null) && (protocols!=null) && protocols.hasMoreElements()) + String protocol = null; + while ((protocol == null) && (protocols != null) && protocols.hasMoreElements()) { String candidate = protocols.nextElement(); for (String p : parseProtocols(candidate)) { - websocket = _acceptor.doWebSocketConnect(request, p); + websocket = acceptor.doWebSocketConnect(request,p); if (websocket != null) { protocol = p; @@ -125,9 +138,9 @@ public class WebSocketServerFactory extends AbstractLifeCycle if (websocket == null) { // Try with no protocol - websocket = _acceptor.doWebSocketConnect(request, null); + websocket = acceptor.doWebSocketConnect(request,null); - if (websocket==null) + if (websocket == null) { response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return false; @@ -135,38 +148,38 @@ public class WebSocketServerFactory extends AbstractLifeCycle } // Send the upgrade - upgrade(request, response, websocket, protocol); + upgrade(request,response,websocket,protocol); return true; } return false; } - - protected boolean addConnection(WebSocketServletConnection connection) + + protected boolean addConnection(AsyncWebSocketConnection connection) { return isRunning() && connections.add(connection); } protected void closeConnections() { - for (WebSocketServletConnection connection : connections) + for (AsyncWebSocketConnection connection : connections) { - // TODO connection.shutdown(); + connection.getEndPoint().close(); } } @Override protected void doStop() throws Exception { - closeConnections(); + closeConnections(); } /** * @return A modifiable map of extension name to extension class */ - public Map> getExtensionClassesMap() + public Map> getExtensionClassesMap() { - return _extensionClasses; + return extensionClasses; } /** @@ -179,34 +192,22 @@ public class WebSocketServerFactory extends AbstractLifeCycle return policy; } - public List initExtensions(List requested,int maxDataOpcodes,int maxControlOpcodes,int maxReservedBits) + public List initExtensions(List requested) { List extensions = new ArrayList(); - for (String rExt : requested) + + for (ExtensionConfig cfg : requested) { - QuotedStringTokenizer tok = new QuotedStringTokenizer(rExt,";"); - String extName=tok.nextToken().trim(); - Map parameters = new HashMap(); - while (tok.hasMoreTokens()) - { - QuotedStringTokenizer nv = new QuotedStringTokenizer(tok.nextToken().trim(),"="); - String name=nv.nextToken().trim(); - String value=nv.hasMoreTokens()?nv.nextToken().trim():null; - parameters.put(name,value); - } + Extension extension = newExtension(cfg.getName()); - Extension extension = newExtension(extName); - - if (extension==null) + if (extension == null) { continue; } - if (extension.init(parameters)) - { - LOG.debug("add {} {}",extName,parameters); - extensions.add(extension); - } + extension.setConfig(cfg); + LOG.debug("added {}",extension); + extensions.add(extension); } LOG.debug("extensions={}",extensions); return extensions; @@ -216,8 +217,8 @@ public class WebSocketServerFactory extends AbstractLifeCycle { try { - Class extClass = _extensionClasses.get(name); - if (extClass!=null) + Class extClass = extensionClasses.get(name); + if (extClass != null) { return extClass.newInstance(); } @@ -234,16 +235,18 @@ public class WebSocketServerFactory extends AbstractLifeCycle { if (protocol == null) { - return new String[]{null}; + return new String[] + { null }; } protocol = protocol.trim(); if ((protocol == null) || (protocol.length() == 0)) { - return new String[]{null}; + return new String[] + { null }; } String[] passed = protocol.split("\\s*,\\s*"); String[] protocols = new String[passed.length + 1]; - System.arraycopy(passed, 0, protocols, 0, passed.length); + System.arraycopy(passed,0,protocols,0,passed.length); return protocols; } @@ -254,87 +257,79 @@ public class WebSocketServerFactory extends AbstractLifeCycle /** * Upgrade the request/response to a WebSocket Connection. - *

This method will not normally return, but will instead throw a - * UpgradeConnectionException, to exit HTTP handling and initiate - * WebSocket handling of the connection. - * - * @param request The request to upgrade - * @param response The response to upgrade - * @param websocket The websocket handler implementation to use - * @param protocol The websocket protocol - * @throws IOException in case of I/O errors + *

+ * This method will not normally return, but will instead throw a UpgradeConnectionException, to exit HTTP handling and initiate WebSocket handling of the + * connection. + * + * @param request + * The request to upgrade + * @param response + * The response to upgrade + * @param websocket + * The websocket handler implementation to use + * @param acceptedSubProtocol + * The accepted websocket sub protocol + * @throws IOException + * in case of I/O errors */ - public void upgrade(HttpServletRequest request, HttpServletResponse response, WebSocket websocket, String protocol) - throws IOException + public void upgrade(HttpServletRequest request, HttpServletResponse response, WebSocket websocket, String acceptedSubProtocol) throws IOException { if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade"))) { - throw new IllegalStateException("!Upgrade:websocket"); + throw new IllegalStateException("Not a 'WebSocket: Ugprade' request"); } if (!"HTTP/1.1".equals(request.getProtocol())) { - throw new IllegalStateException("!HTTP/1.1"); + throw new IllegalStateException("Not a 'HTTP/1.1' request"); } - int draft = request.getIntHeader("Sec-WebSocket-Version"); - if (draft < 0) { + int version = request.getIntHeader("Sec-WebSocket-Version"); + if (version < 0) + { // Old pre-RFC version specifications (header not present in RFC-6455) - draft = request.getIntHeader("Sec-WebSocket-Draft"); + version = request.getIntHeader("Sec-WebSocket-Draft"); } - HttpConnection http = HttpConnection.getCurrentConnection(); - AsyncEndPoint endp = http.getEndPoint(); - List extensions_requested = new ArrayList(); - @SuppressWarnings("unchecked") + List extensionsRequested = new ArrayList<>(); Enumeration e = request.getHeaders("Sec-WebSocket-Extensions"); while (e.hasMoreElements()) { QuotedStringTokenizer tok = new QuotedStringTokenizer(e.nextElement(),","); while (tok.hasMoreTokens()) { - extensions_requested.add(tok.nextToken()); + extensionsRequested.add(ExtensionConfig.parse(tok.nextToken())); } } - final WebSocketServletConnection connection; - switch (draft) + WebSocketServer.Handshake handshaker = handshakes.get(version); + if (handshaker == null) { - case org.eclipse.jetty.websocket.api.WebSocket.VERSION: // RFC 6455 Version - { -// List extensions = initExtensions(extensions_requested, -// 8 - WebSocketConnectionRFC6455.OP_EXT_DATA, -// 16 - WebSocketConnectionRFC6455.OP_EXT_CTRL, -// 3); -// connection = new WebSocketServletConnectionRFC6455(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft); - break; - } - default: - { - LOG.warn("Unsupported Websocket version: " + draft); - // Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol - // Using the examples as outlined - response.setHeader("Sec-WebSocket-Version", "" + org.eclipse.jetty.websocket.api.WebSocket.VERSION /*+ ", 0"*/); - response.setStatus(HttpStatus.BAD_REQUEST_400); - return; - } + LOG.warn("Unsupported Websocket version: " + version); + // Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol + // Using the examples as outlined + response.setHeader("Sec-WebSocket-Version",supportedVersions); + response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification"); + return; } - // addConnection(connection); + // Create connection + HttpConnection http = HttpConnection.getCurrentConnection(); + AsyncEndPoint endp = http.getEndPoint(); + Executor executor = http.getConnector().findExecutor(); + final AsyncWebSocketConnection connection = new AsyncWebSocketConnection(endp,executor,policy); + endp.setAsyncConnection(connection); - // Set the defaults - // connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize); - // connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize); + // Initialize / Negotiate Extensions + List extensions = initExtensions(extensionsRequested); - // Let the connection finish processing the handshake - // connection.handshake(request, response, protocol); - response.flushBuffer(); + // Process (version specific) handshake response + handshaker.doHandshakeResponse(request,response,extensions,acceptedSubProtocol); - // Give the connection any unused data from the HTTP connection. - // connection.fillBuffersFrom(((HttpParser)http.getParser()).getHeaderBuffer()); - // connection.fillBuffersFrom(((HttpParser)http.getParser()).getBodyBuffer()); + // Add connection + addConnection(connection); // Tell jetty about the new connection - // LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),draft,protocol,connection); - // request.setAttribute("org.eclipse.jetty.io.Connection", connection); + LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,acceptedSubProtocol,connection); + request.setAttribute("org.eclipse.jetty.io.Connection",connection); // TODO: this still needed? } } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServlet.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServlet.java index c70e9461118..35ad02b5606 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServlet.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServlet.java @@ -29,25 +29,58 @@ import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketPolicy; /** - * Servlet to upgrade connections to WebSocket - *

- * The request must have the correct upgrade headers, else it is - * handled as a normal servlet request. - *

- * The initParameter "bufferSize" can be used to set the buffer size, - * which is also the max frame byte size (default 8192). - *

- * The initParameter "maxIdleTime" can be used to set the time in ms - * that a websocket may be idle before closing. - *

- * The initParameter "maxTextMessagesSize" can be used to set the size in characters - * that a websocket may be accept before closing. - *

- * The initParameter "maxBinaryMessagesSize" can be used to set the size in bytes - * that a websocket may be accept before closing. + * Abstract Servlet used to bridge the Servlet API to the WebSocket API. + *

+ * This servlet implements the {@link WebSocketServer.Acceptor}, with a default implementation of + * {@link WebSocketServer.Acceptor#checkOrigin(HttpServletRequest, String)} leaving you to implement the + * {@link WebSocketServer.Acceptor#doWebSocketConnect(HttpServletRequest, String)}. + *

+ * The most basic implementation would be as follows. + * + *

+ * package my.example;
+ * 
+ * import javax.servlet.http.HttpServletRequest;
+ * import org.eclipse.jetty.websocket.WebSocket;
+ * import org.eclipse.jetty.websocket.server.WebSocketServlet;
+ * 
+ * public class MyEchoServlet extends WebSocketServlet
+ * {
+ *     @Override
+ *     public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
+ *     {
+ *         return new MyEchoSocket();
+ *     }
+ * }
+ * 
+ * + * Note: this servlet will only forward on a incoming request that hits this servlet to the + * {@link WebSocketServer.Acceptor#doWebSocketConnect(HttpServletRequest, String)} if it conforms to a "WebSocket: Upgrade" handshake request.
+ * All other requests are treated as normal servlet requets. + * + *

+ * Configuration / Init-Parameters: + * + *

+ *
bufferSize
+ *
can be used to set the buffer size, which is also the max frame byte size
+ * Default: 8192
+ * + *
maxIdleTime
+ *
set the time in ms that a websocket may be idle before closing
+ * Default:
+ * + *
maxTextMessagesSize
+ *
set the size in characters that a websocket may be accept before closing
+ * Default:
+ * + *
maxBinaryMessagesSize
+ *
set the size in bytes that a websocket may be accept before closing
+ * Default:
+ *
*/ @SuppressWarnings("serial") -public abstract class WebSocketServlet extends HttpServlet implements WebSocketServerFactory.Acceptor +public abstract class WebSocketServlet extends HttpServlet implements WebSocketServer.Acceptor { private final Logger LOG = Log.getLogger(getClass()); private WebSocketServerFactory webSocketFactory; @@ -81,22 +114,26 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketS { String bs = getInitParameter("bufferSize"); WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER); - if(bs != null) { + if (bs != null) + { policy.setBufferSize(Integer.parseInt(bs)); } String max = getInitParameter("maxIdleTime"); - if (max != null) { + if (max != null) + { policy.setMaxIdleTime(Integer.parseInt(max)); } max = getInitParameter("maxTextMessageSize"); - if (max != null) { + if (max != null) + { policy.setMaxTextMessageSize(Integer.parseInt(max)); } max = getInitParameter("maxBinaryMessageSize"); - if (max != null) { + if (max != null) + { policy.setMaxBinaryMessageSize(Integer.parseInt(max)); } @@ -118,6 +155,6 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketS { return; } - super.service(request, response); + super.service(request,response); } } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeHixie76.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeHixie76.java new file mode 100644 index 00000000000..3862634d48f --- /dev/null +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeHixie76.java @@ -0,0 +1,29 @@ +package org.eclipse.jetty.websocket.server.handshake; + +import java.io.IOException; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.websocket.extensions.Extension; +import org.eclipse.jetty.websocket.server.WebSocketServer; + +/** + * WebSocket Handshake for spec Hixie-76 Draft. + *

+ * Most often seen in use by Safari/OSX + */ +public class HandshakeHixie76 implements WebSocketServer.Handshake +{ + /** draft-hixie-thewebsocketprotocol-76 - Sec-WebSocket-Draft */ + public static final int VERSION = 0; + + @Override + public void doHandshakeResponse(HttpServletRequest request, HttpServletResponse response, List extensions, String acceptedSubProtocol) + throws IOException + { + // TODO: implement the Hixie76 handshake? + throw new IOException("Not implemented yet"); + } +} diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeRFC6455.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeRFC6455.java new file mode 100644 index 00000000000..8307050c171 --- /dev/null +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/handshake/HandshakeRFC6455.java @@ -0,0 +1,50 @@ +package org.eclipse.jetty.websocket.server.handshake; + +import java.util.List; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.websocket.api.AcceptHash; +import org.eclipse.jetty.websocket.extensions.Extension; +import org.eclipse.jetty.websocket.server.WebSocketServer; + +/** + * WebSocket Handshake for RFC 6455. + */ +public class HandshakeRFC6455 implements WebSocketServer.Handshake +{ + /** RFC 6455 - Sec-WebSocket-Version */ + public static final int VERSION = 13; + + @Override + public void doHandshakeResponse(HttpServletRequest request, HttpServletResponse response, List extensions, String acceptedSubProtocol) + { + String key = request.getHeader("Sec-WebSocket-Key"); + + if (key == null) + { + throw new IllegalStateException("Missing request header 'Sec-WebSocket-Key'"); + } + + // build response + response.setHeader("Upgrade","WebSocket"); + response.addHeader("Connection","Upgrade"); + response.addHeader("Sec-WebSocket-Accept",AcceptHash.hashKey(key)); + + if (acceptedSubProtocol != null) + { + response.addHeader("Sec-WebSocket-Protocol",acceptedSubProtocol); + } + + if (extensions != null) + { + for (Extension ext : extensions) + { + response.addHeader("Sec-WebSocket-Extensions",ext.getConfig().getParameterizedName()); + } + } + + response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); + } +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java index b6ba1bd68c0..8591a90ccfc 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java @@ -11,29 +11,37 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.URI; -import java.util.concurrent.TimeUnit; +import java.nio.ByteBuffer; import javax.servlet.http.HttpServletRequest; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketGeneratorRFC6455Test; -import org.eclipse.jetty.websocket.server.helper.MessageSender; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.frames.CloseFrame; +import org.eclipse.jetty.websocket.frames.TextFrame; +import org.eclipse.jetty.websocket.generator.Generator; +import org.eclipse.jetty.websocket.parser.Parser; +import org.eclipse.jetty.websocket.server.helper.FrameParseCapture; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; /** - * Test various RFC 6455 specified requirements placed on - * {@link WebSocketServlet} + * Test various RFC 6455 specified requirements placed on {@link WebSocketServlet} *

- * This test serves a different purpose than than the {@link WebSocketGeneratorRFC6455Test}, - * {@link WebSocketMessageRFC6455Test}, and {@link WebSocketParserRFC6455Test} tests. + * This test serves a different purpose than than the {@link WebSocketGeneratorRFC6455Test}, {@link WebSocketMessageRFC6455Test}, and + * {@link WebSocketParserRFC6455Test} tests. */ public class WebSocketServletRFCTest { @@ -64,6 +72,7 @@ public class WebSocketServletRFCTest // trigger a WebSocket server terminated close. if (data.equals("CRASH")) { + System.out.printf("Got OnTextMessage"); throw new RuntimeException("Something bad happened"); } @@ -131,6 +140,14 @@ public class WebSocketServletRFCTest } } + private void read(InputStream in, ByteBuffer buf) throws IOException + { + while ((in.available() > 0) && (buf.remaining() > 0)) + { + buf.put((byte)in.read()); + } + } + private String readResponseHeader(InputStream in) throws IOException { InputStreamReader isr = new InputStreamReader(in); @@ -153,34 +170,79 @@ public class WebSocketServletRFCTest } /** - * Test the requirement of responding with server terminated close code 1011 when there is an unhandled (internal - * server error) being produced by the extended WebSocketServlet. + * Test the requirement of responding with server terminated close code 1011 when there is an unhandled (internal server error) being produced by the + * extended WebSocketServlet. */ @Test public void testResponseOnInternalError() throws Exception { - // WebSocketClientFactory clientFactory = new WebSocketClientFactory(); - // clientFactory.start(); + Socket socket = new Socket(); + SocketAddress endpoint = new InetSocketAddress(serverUri.getHost(),serverUri.getPort()); + socket.connect(endpoint); - // WebSocketClient wsc = clientFactory.newWebSocketClient(); - MessageSender sender = new MessageSender(); - // wsc.open(serverUri,sender); + // acting as client + WebSocketPolicy policy = WebSocketPolicy.newClientPolicy(); + ByteBufferPool bufferPool = new StandardByteBufferPool(policy.getBufferSize()); + Generator generator = new Generator(bufferPool,policy); + Parser parser = new Parser(policy); + FrameParseCapture capture = new FrameParseCapture(); + parser.addListener(capture); + StringBuilder req = new StringBuilder(); + req.append("GET / HTTP/1.1\r\n"); + req.append(String.format("Host: %s:%d\r\n",serverUri.getHost(),serverUri.getPort())); + req.append("Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"); + req.append("Upgrade: WebSocket\r\n"); + req.append("Connection: Upgrade\r\n"); + req.append("Sec-WebSocket-Version: 13\r\n"); // RFC 6455 + req.append("\r\n"); + + OutputStream out = null; + InputStream in = null; try { - sender.awaitConnect(); + out = socket.getOutputStream(); + in = socket.getInputStream(); - sender.sendMessage("CRASH"); + // Write request + out.write(req.toString().getBytes()); + out.flush(); - // Give servlet 500 millisecond to process messages - TimeUnit.MILLISECONDS.sleep(500); + // Read response header + String respHeader = readResponseHeader(in); + // System.out.println("RESPONSE: " + respHeader); - Assert.assertThat("WebSocket should be closed",sender.isConnected(),is(false)); - Assert.assertThat("WebSocket close clode",sender.getCloseCode(),is(1011)); + Assert.assertThat("Response Code",respHeader,startsWith("HTTP/1.1 101 Switching Protocols")); + Assert.assertThat("Response Header Upgrade",respHeader,containsString("Upgrade: WebSocket\r\n")); + // Assert.assertThat("Response Header Connection",respHeader,containsString("Connection: Upgrade\r\n")); + + // Generate text frame + TextFrame txt = new TextFrame("CRASH"); + ByteBuffer txtbuf = generator.generate(txt); + txtbuf.flip(); + + // Write Text Frame + BufferUtil.writeTo(txtbuf,out); + + // Read frame (hopefully close frame) + ByteBuffer closeFrame = ByteBuffer.allocate(20); + System.out.println("Reading from in"); + read(in,closeFrame); + + // Parse Frame + parser.parse(closeFrame); + + capture.assertNoErrors(); + capture.assertHasFrame(CloseFrame.class,1); + + CloseFrame cf = (CloseFrame)capture.getFrames().get(0); + Assert.assertThat("Close Frame.status code",cf.getStatusCode(),is(StatusCode.SERVER_ERROR)); } finally { - sender.close(); + IO.close(in); + IO.close(out); + socket.close(); } } @@ -190,9 +252,6 @@ public class WebSocketServletRFCTest @Test public void testResponseOnInvalidVersion() throws Exception { - // Using straight Socket to accomplish this as jetty's WebSocketClient - // doesn't allow the use of invalid versions. (obviously) - Socket socket = new Socket(); SocketAddress endpoint = new InetSocketAddress(serverUri.getHost(),serverUri.getPort()); socket.connect(endpoint); @@ -221,7 +280,7 @@ public class WebSocketServletRFCTest // System.out.println("RESPONSE: " + respHeader); Assert.assertThat("Response Code",respHeader,startsWith("HTTP/1.1 400 Unsupported websocket version specification")); - Assert.assertThat("Response Header Versions",respHeader,containsString("Sec-WebSocket-Version: 13, 8, 6, 0\r\n")); + Assert.assertThat("Response Header Versions",respHeader,containsString("Sec-WebSocket-Version: 13, 0\r\n")); } finally { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoServlet.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoServlet.java new file mode 100644 index 00000000000..ac72653d477 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoServlet.java @@ -0,0 +1,19 @@ +package org.eclipse.jetty.websocket.server.examples; + +import javax.servlet.http.HttpServletRequest; + +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.server.WebSocketServlet; + +/** + * Example servlet for most basic form. + */ +@SuppressWarnings("serial") +public class MyEchoServlet extends WebSocketServlet +{ + @Override + public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) + { + return new MyEchoSocket(); + } +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java new file mode 100644 index 00000000000..658273d934f --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java @@ -0,0 +1,36 @@ +package org.eclipse.jetty.websocket.server.examples; + +import java.io.IOException; + +import org.eclipse.jetty.websocket.WebSocket; + +public class MyEchoSocket implements WebSocket, WebSocket.OnTextMessage +{ + private Connection conn; + + @Override + public void onClose(int closeCode, String message) + { + /* do nothing */ + } + + @Override + public void onMessage(String data) + { + try + { + // echo the data back + conn.sendMessage(data); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + @Override + public void onOpen(Connection connection) + { + this.conn = connection; + } +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/FrameParseCapture.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/FrameParseCapture.java new file mode 100644 index 00000000000..82ecc7fdafe --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/FrameParseCapture.java @@ -0,0 +1,94 @@ +package org.eclipse.jetty.websocket.server.helper; + +import static org.hamcrest.Matchers.*; + +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.frames.BaseFrame; +import org.eclipse.jetty.websocket.parser.Parser; +import org.junit.Assert; + +public class FrameParseCapture implements Parser.Listener +{ + private static final Logger LOG = Log.getLogger(FrameParseCapture.class); + private List frames = new ArrayList<>(); + private List errors = new ArrayList<>(); + + public void assertHasErrors(Class errorType, int expectedCount) + { + Assert.assertThat(errorType.getSimpleName(),getErrorCount(errorType),is(expectedCount)); + } + + public void assertHasFrame(Class frameType) + { + Assert.assertThat(frameType.getSimpleName(),getFrameCount(frameType),greaterThanOrEqualTo(1)); + } + + public void assertHasFrame(Class frameType, int expectedCount) + { + Assert.assertThat(frameType.getSimpleName(),getFrameCount(frameType),is(expectedCount)); + } + + public void assertHasNoFrames() + { + Assert.assertThat("Has no frames",frames.size(),is(0)); + } + + public void assertNoErrors() + { + Assert.assertThat("Has no errors",errors.size(),is(0)); + } + + public int getErrorCount(Class errorType) + { + int count = 0; + for (WebSocketException error : errors) + { + if (errorType.isInstance(error)) + { + count++; + } + } + return count; + } + + public List getErrors() + { + return errors; + } + + public int getFrameCount(Class frameType) + { + int count = 0; + for (BaseFrame frame : frames) + { + if (frameType.isInstance(frame)) + { + count++; + } + } + return count; + } + + public List getFrames() + { + return frames; + } + + @Override + public void onFrame(BaseFrame frame) + { + frames.add(frame); + } + + @Override + public void onWebSocketException(WebSocketException e) + { + LOG.warn(e); + errors.add(e); + } +}