367502 - WebSocket connections should be closed when application context is stopped.

This commit is contained in:
Simone Bordet 2011-12-23 17:20:57 +01:00
parent 371e998b62
commit 62f47e0617
12 changed files with 302 additions and 73 deletions

View File

@ -28,4 +28,6 @@ public interface WebSocketConnection extends AsyncConnection
List<Extension> getExtensions();
WebSocket.Connection getConnection();
void shutdown();
}

View File

@ -293,6 +293,11 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
}
}
public void shutdown()
{
close();
}
/* ------------------------------------------------------------ */
public void fillBuffersFrom(Buffer buffer)
{

View File

@ -274,6 +274,13 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
}
}
public void shutdown()
{
final WebSocket.Connection connection = _connection;
if (connection != null)
connection.close(CLOSE_SHUTDOWN, null);
}
/* ------------------------------------------------------------ */
public void fillBuffersFrom(Buffer buffer)
{

View File

@ -370,6 +370,13 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
}
}
public void shutdown()
{
final WebSocket.Connection connection = _connection;
if (connection != null)
connection.close(CLOSE_SHUTDOWN, null);
}
/* ------------------------------------------------------------ */
public void fillBuffersFrom(Buffer buffer)
{

View File

@ -400,6 +400,13 @@ public class WebSocketConnectionRFC6455 extends AbstractConnection implements We
}
}
public void shutdown()
{
final WebSocket.Connection connection = _connection;
if (connection != null)
connection.close(CLOSE_SHUTDOWN, null);
}
/* ------------------------------------------------------------ */
public void fillBuffersFrom(Buffer buffer)
{
@ -431,7 +438,7 @@ public class WebSocketConnectionRFC6455 extends AbstractConnection implements We
/* ------------------------------------------------------------ */
private class WSFrameConnection implements WebSocket.FrameConnection
{
volatile boolean _disconnecting;
private volatile boolean _disconnecting;
/* ------------------------------------------------------------ */
public void sendMessage(String content) throws IOException

View File

@ -30,12 +30,12 @@ package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -45,15 +45,17 @@ import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.server.AbstractHttpConnection;
import org.eclipse.jetty.server.BlockingHttpConnection;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Factory to create WebSocket connections
*/
public class WebSocketFactory
public class WebSocketFactory extends AbstractLifeCycle
{
private static final Logger LOG = Log.getLogger(WebSocketFactory.class);
private final Queue<WebSocketServletConnection> connections = new ConcurrentLinkedQueue<WebSocketServletConnection>();
public interface Acceptor
{
@ -87,21 +89,15 @@ public class WebSocketFactory
private final Acceptor _acceptor;
private WebSocketBuffers _buffers;
private int _maxIdleTime = 300000;
private int _maxTextMessageSize = 16*1024;
private int _maxTextMessageSize = 16 * 1024;
private int _maxBinaryMessageSize = -1;
public WebSocketFactory(Acceptor acceptor)
{
this(acceptor, 64 * 1024);
}
public WebSocketFactory(Acceptor acceptor, int bufferSize)
{
_buffers = new WebSocketBuffers(bufferSize);
_acceptor = acceptor;
}
/**
* @return A modifiable map of extension name to extension class
*/
@ -187,6 +183,12 @@ public class WebSocketFactory
_maxBinaryMessageSize = maxBinaryMessageSize;
}
@Override
protected void doStop() throws Exception
{
closeConnections();
}
/**
* Upgrade the request/response to a WebSocket Connection.
* <p>This method will not normally return, but will instead throw a
@ -230,44 +232,53 @@ public class WebSocketFactory
}
final WebSocketServletConnection connection;
final List<Extension> extensions;
switch (draft)
{
case -1: // unspecified draft/version
case 0: // Old school draft/version
extensions=Collections.emptyList();
connection = new WebSocketServletConnectionD00(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
{
connection = new WebSocketServletConnectionD00(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
break;
}
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
extensions=Collections.emptyList();
connection = new WebSocketServletConnectionD06(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
{
connection = new WebSocketServletConnectionD06(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
break;
}
case 7:
case 8:
extensions= initExtensions(extensions_requested,8-WebSocketConnectionD08.OP_EXT_DATA, 16-WebSocketConnectionD08.OP_EXT_CTRL,3);
connection = new WebSocketServletConnectionD08(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol,extensions,draft);
{
List<Extension> extensions = initExtensions(extensions_requested, 8 - WebSocketConnectionD08.OP_EXT_DATA, 16 - WebSocketConnectionD08.OP_EXT_CTRL, 3);
connection = new WebSocketServletConnectionD08(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft);
break;
}
case WebSocketConnectionRFC6455.VERSION: // RFC 6455 Version
extensions= initExtensions(extensions_requested,8-WebSocketConnectionRFC6455.OP_EXT_DATA, 16-WebSocketConnectionRFC6455.OP_EXT_CTRL,3);
connection = new WebSocketServletConnectionRFC6455(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol,extensions,draft);
{
List<Extension> extensions = initExtensions(extensions_requested, 8 - WebSocketConnectionRFC6455.OP_EXT_DATA, 16 - WebSocketConnectionRFC6455.OP_EXT_CTRL, 3);
connection = new WebSocketServletConnectionRFC6455(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft);
break;
}
default:
LOG.warn("Unsupported Websocket version: "+draft);
{
LOG.warn("Unsupported Websocket version: " + draft);
// Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
// Using the examples as outlined
response.setHeader("Sec-WebSocket-Version","13, 8, 6, 0");
response.setHeader("Sec-WebSocket-Version", "13, 8, 6, 0");
throw new HttpException(400, "Unsupported websocket version specification: " + draft);
}
}
// Set the defaults
connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
addConnection(connection);
// Let the connection finish processing the handshake
connection.handshake(request, response, protocol);
response.flushBuffer();
@ -281,8 +292,6 @@ public class WebSocketFactory
request.setAttribute("org.eclipse.jetty.io.Connection", connection);
}
/**
*/
protected String[] parseProtocols(String protocol)
{
if (protocol == null)
@ -296,8 +305,6 @@ public class WebSocketFactory
return protocols;
}
/**
*/
public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response)
throws IOException
{
@ -353,8 +360,6 @@ public class WebSocketFactory
return false;
}
/**
*/
public List<Extension> initExtensions(List<String> requested,int maxDataOpcodes,int maxControlOpcodes,int maxReservedBits)
{
List<Extension> extensions = new ArrayList<Extension>();
@ -386,8 +391,6 @@ public class WebSocketFactory
return extensions;
}
/**
*/
private Extension newExtension(String name)
{
try
@ -403,4 +406,20 @@ public class WebSocketFactory
return null;
}
protected boolean addConnection(WebSocketServletConnection connection)
{
return isRunning() && connections.add(connection);
}
protected boolean removeConnection(WebSocketServletConnection connection)
{
return connections.remove(connection);
}
protected void closeConnections()
{
for (WebSocketServletConnection connection : connections)
connection.shutdown();
}
}

View File

@ -34,31 +34,33 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/**
* Servlet to upgrade connections to WebSocket
* <p>
* <p/>
* The request must have the correct upgrade headers, else it is
* handled as a normal servlet request.
* <p>
* <p/>
* The initParameter "bufferSize" can be used to set the buffer size,
* which is also the max frame byte size (default 8192).
* <p>
* <p/>
* The initParameter "maxIdleTime" can be used to set the time in ms
* that a websocket may be idle before closing.
* <p>
* <p/>
* The initParameter "maxTextMessagesSize" can be used to set the size in characters
* that a websocket may be accept before closing.
* <p>
* <p/>
* The initParameter "maxBinaryMessagesSize" can be used to set the size in bytes
* that a websocket may be accept before closing.
*
*/
@SuppressWarnings("serial")
public abstract class WebSocketServlet extends HttpServlet implements WebSocketFactory.Acceptor
{
WebSocketFactory _webSocketFactory;
private final Logger LOG = Log.getLogger(getClass());
private WebSocketFactory _webSocketFactory;
/* ------------------------------------------------------------ */
/**
@ -67,20 +69,32 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketF
@Override
public void init() throws ServletException
{
String bs=getInitParameter("bufferSize");
_webSocketFactory = new WebSocketFactory(this,bs==null?8192:Integer.parseInt(bs));
String max=getInitParameter("maxIdleTime");
if (max!=null)
_webSocketFactory.setMaxIdleTime(Integer.parseInt(max));
try
{
String bs = getInitParameter("bufferSize");
_webSocketFactory = new WebSocketFactory(this, bs == null ? 8192 : Integer.parseInt(bs));
_webSocketFactory.start();
max=getInitParameter("maxTextMessageSize");
if (max!=null)
_webSocketFactory.setMaxTextMessageSize(Integer.parseInt(max));
String max = getInitParameter("maxIdleTime");
if (max != null)
_webSocketFactory.setMaxIdleTime(Integer.parseInt(max));
max=getInitParameter("maxBinaryMessageSize");
if (max!=null)
_webSocketFactory.setMaxBinaryMessageSize(Integer.parseInt(max));
max = getInitParameter("maxTextMessageSize");
if (max != null)
_webSocketFactory.setMaxTextMessageSize(Integer.parseInt(max));
max = getInitParameter("maxBinaryMessageSize");
if (max != null)
_webSocketFactory.setMaxBinaryMessageSize(Integer.parseInt(max));
}
catch (ServletException x)
{
throw x;
}
catch (Exception x)
{
throw new ServletException(x);
}
}
/* ------------------------------------------------------------ */
@ -90,9 +104,9 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketF
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if (_webSocketFactory.acceptWebSocket(request,response) || response.isCommitted())
if (_webSocketFactory.acceptWebSocket(request, response) || response.isCommitted())
return;
super.service(request,response);
super.service(request, response);
}
/* ------------------------------------------------------------ */
@ -101,6 +115,17 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketF
return true;
}
/* ------------------------------------------------------------ */
@Override
public void destroy()
{
try
{
_webSocketFactory.stop();
}
catch (Exception x)
{
LOG.ignore(x);
}
}
}

View File

@ -25,10 +25,13 @@ import org.eclipse.jetty.util.QuotedStringTokenizer;
public class WebSocketServletConnectionD00 extends WebSocketConnectionD00 implements WebSocketServletConnection
{
public WebSocketServletConnectionD00(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
private final WebSocketFactory factory;
public WebSocketServletConnectionD00(WebSocketFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol);
this.factory = factory;
}
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
@ -70,7 +73,7 @@ public class WebSocketServletConnectionD00 extends WebSocketConnectionD00 implem
{
response.addHeader("Sec-WebSocket-Protocol",subprotocol);
}
response.sendError(101,"WebSocket Protocol Handshake");
response.sendError(101, "WebSocket Protocol Handshake");
}
else
{
@ -89,4 +92,11 @@ public class WebSocketServletConnectionD00 extends WebSocketConnectionD00 implem
onWebsocketOpen();
}
}
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
}

View File

@ -23,10 +23,13 @@ import org.eclipse.jetty.io.EndPoint;
public class WebSocketServletConnectionD06 extends WebSocketConnectionD06 implements WebSocketServletConnection
{
public WebSocketServletConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
private final WebSocketFactory factory;
public WebSocketServletConnectionD06(WebSocketFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol);
this.factory = factory;
}
/* ------------------------------------------------------------ */
@ -47,4 +50,11 @@ public class WebSocketServletConnectionD06 extends WebSocketConnectionD06 implem
onFrameHandshake();
onWebSocketOpen();
}
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
}

View File

@ -24,16 +24,13 @@ import org.eclipse.jetty.io.EndPoint;
public class WebSocketServletConnectionD08 extends WebSocketConnectionD08 implements WebSocketServletConnection
{
public WebSocketServletConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
List<Extension> extensions, int draft, MaskGen maskgen) throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,maskgen);
}
private final WebSocketFactory factory;
public WebSocketServletConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
public WebSocketServletConnectionD08(WebSocketFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
List<Extension> extensions, int draft) throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft);
this.factory = factory;
}
/* ------------------------------------------------------------ */
@ -59,4 +56,11 @@ public class WebSocketServletConnectionD08 extends WebSocketConnectionD08 implem
onFrameHandshake();
onWebSocketOpen();
}
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
}

View File

@ -24,16 +24,13 @@ import org.eclipse.jetty.io.EndPoint;
public class WebSocketServletConnectionRFC6455 extends WebSocketConnectionRFC6455 implements WebSocketServletConnection
{
public WebSocketServletConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
List<Extension> extensions, int draft, MaskGen maskgen) throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,maskgen);
}
private final WebSocketFactory factory;
public WebSocketServletConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
public WebSocketServletConnectionRFC6455(WebSocketFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
List<Extension> extensions, int draft) throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft);
this.factory = factory;
}
/* ------------------------------------------------------------ */
@ -59,4 +56,11 @@ public class WebSocketServletConnectionRFC6455 extends WebSocketConnectionRFC645
onFrameHandshake();
onWebSocketOpen();
}
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
}

View File

@ -0,0 +1,129 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class WebSocketRedeployTest
{
private Server server;
private ServletContextHandler context;
private String uri;
private WebSocketClientFactory wsFactory;
public void init(final WebSocket webSocket) throws Exception
{
server = new Server();
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(8080);
server.addConnector(connector);
HandlerCollection handlers = new HandlerCollection();
server.setHandler(handlers);
String contextPath = "/test_context";
context = new ServletContextHandler(handlers, contextPath, ServletContextHandler.SESSIONS);
WebSocketServlet servlet = new WebSocketServlet()
{
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return webSocket;
}
};
String servletPath = "/websocket";
context.addServlet(new ServletHolder(servlet), servletPath);
server.start();
uri = "ws://localhost:" + connector.getLocalPort() + contextPath + servletPath;
wsFactory = new WebSocketClientFactory();
wsFactory.start();
}
@After
public void destroy() throws Exception
{
if (wsFactory != null)
wsFactory.stop();
if (server != null)
{
server.stop();
server.join();
}
}
@Test
public void testStoppingContextClosesConnections() throws Exception
{
final CountDownLatch openLatch = new CountDownLatch(2);
final CountDownLatch closeLatch = new CountDownLatch(2);
init(new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
openLatch.countDown();
}
public void onMessage(String data)
{
}
public void onClose(int closeCode, String message)
{
closeLatch.countDown();
}
});
WebSocketClient client = wsFactory.newWebSocketClient();
client.open(new URI(uri), new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
openLatch.countDown();
}
public void onMessage(String data)
{
}
public void onClose(int closeCode, String message)
{
closeLatch.countDown();
}
}, 5, TimeUnit.SECONDS);
Assert.assertTrue(openLatch.await(5, TimeUnit.SECONDS));
context.stop();
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
}