Reworking EventDriver to be mroe generic and not use reflection in 100% of cases

This commit is contained in:
Joakim Erdfelt 2012-09-26 13:43:17 -07:00
parent be9d24584c
commit cda1c4588f
36 changed files with 860 additions and 676 deletions

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
public interface WebSocketClient
{
@ -45,7 +45,7 @@ public interface WebSocketClient
public UpgradeResponse getUpgradeResponse();
public WebSocketEventDriver getWebSocket();
public EventDriver getWebSocket();
public URI getWebSocketUri();

View File

@ -39,10 +39,10 @@ import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
public class WebSocketClientFactory extends ContainerLifeCycle
@ -52,7 +52,7 @@ public class WebSocketClientFactory extends ContainerLifeCycle
private final ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Executor executor;
private final Scheduler scheduler;
private final EventMethodsCache methodsCache;
private final EventDriverFactory eventDriverFactory;
private final WebSocketPolicy policy;
private final WebSocketExtensionRegistry extensionRegistry;
private SocketAddress bindAddress;
@ -103,7 +103,7 @@ public class WebSocketClientFactory extends ContainerLifeCycle
this.connectionManager = new ConnectionManager(bufferPool,executor,scheduler,sslContextFactory,policy);
addBean(this.connectionManager);
this.methodsCache = new EventMethodsCache();
this.eventDriverFactory = new EventDriverFactory(policy);
}
public WebSocketClientFactory(SslContextFactory sslContextFactory)
@ -189,7 +189,7 @@ public class WebSocketClientFactory extends ContainerLifeCycle
public WebSocketClient newWebSocketClient(Object websocketPojo)
{
LOG.debug("Creating new WebSocket for {}",websocketPojo);
WebSocketEventDriver websocket = new WebSocketEventDriver(websocketPojo,methodsCache,policy,getBufferPool());
EventDriver websocket = eventDriverFactory.wrap(websocketPojo);
return new IWebSocketClient(this,websocket);
}

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
/**
* WebSocketClient for working with Upgrade (request and response), and establishing connections to the websocket URI of your choice.
@ -44,7 +44,7 @@ public class IWebSocketClient extends FutureCallback<UpgradeResponse> implements
private final WebSocketClientFactory factory;
private final WebSocketPolicy policy;
private final WebSocketEventDriver websocket;
private final EventDriver websocket;
private URI websocketUri;
/**
* The abstract WebSocketConnection in use and established for this client.
@ -57,7 +57,7 @@ public class IWebSocketClient extends FutureCallback<UpgradeResponse> implements
private ClientUpgradeResponse upgradeResponse;
private Masker masker;
public IWebSocketClient(WebSocketClientFactory factory, WebSocketEventDriver websocket)
public IWebSocketClient(WebSocketClientFactory factory, EventDriver websocket)
{
this.factory = factory;
LOG.debug("factory.isRunning(): {}",factory.isRunning());
@ -206,7 +206,7 @@ public class IWebSocketClient extends FutureCallback<UpgradeResponse> implements
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getWebSocket()
*/
@Override
public WebSocketEventDriver getWebSocket()
public EventDriver getWebSocket()
{
return websocket;
}

View File

@ -43,10 +43,10 @@ import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.UpgradeException;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.protocol.AcceptHash;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
@ -216,7 +216,7 @@ public class UpgradeConnection extends AbstractConnection
WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,client);
// Initialize / Negotiate Extensions
WebSocketEventDriver websocket = client.getWebSocket();
EventDriver websocket = client.getWebSocket();
WebSocketPolicy policy = client.getPolicy();
String acceptedSubProtocol = response.getAcceptedSubProtocol();
WebSocketSession session = new WebSocketSession(websocket,connection,policy,acceptedSubProtocol);

View File

@ -1,342 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.BaseConnection;
import org.eclipse.jetty.websocket.core.api.CloseException;
import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketListener;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.message.MessageAppender;
import org.eclipse.jetty.websocket.core.io.message.MessageInputStream;
import org.eclipse.jetty.websocket.core.io.message.MessageReader;
import org.eclipse.jetty.websocket.core.io.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.core.io.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.Frame;
import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Responsible for routing the internally generated events destined for a specific WebSocket instance to whatever choice of development style the developer has
* used to wireup their specific WebSocket implementation.
* <p>
* Supports WebSocket instances that either implement {@link WebSocketListener} or have used the {@link WebSocket &#064;WebSocket} annotation.
* <p>
* There will be an instance of the WebSocketEventDriver per connection.
*/
public class WebSocketEventDriver implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(WebSocketEventDriver.class);
private final Logger socketLog;
private final Object websocket;
private final WebSocketPolicy policy;
private final EventMethods events;
private final ByteBufferPool bufferPool;
private WebSocketSession session;
private MessageAppender activeMessage;
/**
* Establish the driver for the Websocket POJO
*
* @param websocket
*/
public WebSocketEventDriver(Object websocket, EventMethodsCache methodsCache, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
this.policy = policy;
this.websocket = websocket;
this.events = methodsCache.getMethods(websocket.getClass());
this.bufferPool = bufferPool;
this.socketLog = Log.getLogger(websocket.getClass());
if (events.isAnnotated())
{
WebSocket anno = websocket.getClass().getAnnotation(WebSocket.class);
// Setup the policy
if (anno.maxBufferSize() > 0)
{
this.policy.setBufferSize(anno.maxBufferSize());
}
if (anno.maxBinarySize() > 0)
{
this.policy.setMaxBinaryMessageSize(anno.maxBinarySize());
}
if (anno.maxTextSize() > 0)
{
this.policy.setMaxTextMessageSize(anno.maxTextSize());
}
if (anno.maxIdleTime() > 0)
{
this.policy.setIdleTimeout(anno.maxIdleTime());
}
}
}
public WebSocketPolicy getPolicy()
{
return policy;
}
/**
* Get the Websocket POJO in use
*
* @return the Websocket POJO
*/
public Object getWebSocketObject()
{
return websocket;
}
@Override
public void incoming(WebSocketException e)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.incoming({})",websocket.getClass().getSimpleName(),e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
}
if (events.onException != null)
{
events.onException.call(websocket,session,e);
}
}
/**
* Internal entry point for incoming frames
*
* @param frame
* the frame that appeared
*/
@Override
public void incoming(WebSocketFrame frame)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
}
// Generic Read-Only Frame version
if ((frame instanceof Frame) && (events.onFrame != null))
{
events.onFrame.call(websocket,session,frame);
// DO NOT return; - as this is just a read-only notification.
}
try
{
switch (frame.getOpCode())
{
case OpCode.CLOSE:
{
boolean validate = true;
CloseInfo close = new CloseInfo(frame,validate);
if (events.onClose != null)
{
events.onClose.call(websocket,session,close.getStatusCode(),close.getReason());
}
// Is this close frame a response to a prior close?
if (session.getState() == BaseConnection.State.CLOSING)
{
// Then this is close response handshake (to a prior
// outgoing close frame)
session.disconnect();
}
else
{
// This is the initiator for a close handshake
// Trigger close response handshake.
session.notifyClosing();
session.close(close.getStatusCode(),close.getReason());
}
return;
}
case OpCode.PING:
{
WebSocketFrame pong = new WebSocketFrame(OpCode.PONG);
if (frame.getPayloadLength() > 0)
{
// Copy payload
ByteBuffer pongBuf = ByteBuffer.allocate(frame.getPayloadLength());
BufferUtil.clearToFill(pongBuf);
BufferUtil.put(frame.getPayload(),pongBuf);
BufferUtil.flipToFlush(pongBuf,0);
pong.setPayload(pongBuf);
if (LOG.isDebugEnabled())
{
LOG.debug("Pong with {}",BufferUtil.toDetailString(pongBuf));
}
}
session.output("pong",new FutureCallback<String>(),pong);
break;
}
case OpCode.BINARY:
{
if (events.onBinary == null)
{
// not interested in binary events
return;
}
if (activeMessage == null)
{
if (events.onBinary.isStreaming())
{
activeMessage = new MessageInputStream(websocket,events.onBinary,session,bufferPool,policy);
}
else
{
activeMessage = new SimpleBinaryMessage(websocket,events.onBinary,session,policy);
}
}
activeMessage.appendMessage(frame.getPayload());
if (frame.isFin())
{
activeMessage.messageComplete();
activeMessage = null;
}
return;
}
case OpCode.TEXT:
{
if (events.onText == null)
{
// not interested in text events
return;
}
if (activeMessage == null)
{
if (events.onText.isStreaming())
{
activeMessage = new MessageReader(websocket,events.onBinary,session,policy);
}
else
{
activeMessage = new SimpleTextMessage(websocket,events.onText,session,policy);
}
}
activeMessage.appendMessage(frame.getPayload());
if (frame.isFin())
{
activeMessage.messageComplete();
activeMessage = null;
}
return;
}
}
}
catch (NotUtf8Exception e)
{
terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
}
catch (CloseException e)
{
terminateConnection(e.getStatusCode(),e.getMessage());
}
catch (Throwable t)
{
unhandled(t);
}
}
/**
* Internal entry point for connection established
*/
public void onConnect()
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onConnect()",websocket.getClass().getSimpleName());
}
events.onConnect.call(websocket,session);
}
/**
* Set the connection to use for this driver
*
* @param conn
* the connection
*/
public void setSession(WebSocketSession conn)
{
this.session = conn;
}
private void terminateConnection(int statusCode, String rawreason)
{
String reason = rawreason;
if (StringUtil.isNotBlank(reason))
{
// Trim big exception messages here.
if (reason.length() > (WebSocketFrame.MAX_CONTROL_PAYLOAD - 2))
{
reason = reason.substring(0,WebSocketFrame.MAX_CONTROL_PAYLOAD - 2);
}
}
LOG.debug("terminateConnection({},{})",statusCode,rawreason);
session.close(statusCode,reason);
}
@Override
public String toString()
{
return websocket.getClass().getName();
}
private void unhandled(Throwable t)
{
socketLog.warn("Unhandled Error (closing connection)",t);
// Unhandled Error, close the connection.
switch (policy.getBehavior())
{
case SERVER:
terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
break;
case CLIENT:
terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
break;
}
}
}

View File

@ -30,7 +30,7 @@ import org.eclipse.jetty.websocket.core.api.BaseConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
@ -46,10 +46,10 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
private final BaseConnection baseConnection;
private final WebSocketPolicy policy;
private final String subprotocol;
private final WebSocketEventDriver websocket;
private final EventDriver websocket;
private OutgoingFrames outgoing;
public WebSocketSession(WebSocketEventDriver websocket, BaseConnection connection, WebSocketPolicy policy, String subprotocol)
public WebSocketSession(EventDriver websocket, BaseConnection connection, WebSocketPolicy policy, String subprotocol)
{
super();
this.websocket = websocket;

View File

@ -0,0 +1,199 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io.event;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.message.MessageAppender;
import org.eclipse.jetty.websocket.core.io.message.MessageInputStream;
import org.eclipse.jetty.websocket.core.io.message.MessageReader;
import org.eclipse.jetty.websocket.core.io.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.core.io.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Handler for Annotated User WebSocket objects.
*/
public class AnnotatedEventDriver extends EventDriver
{
private final EventMethods events;
private MessageAppender activeMessage;
public AnnotatedEventDriver(WebSocketPolicy policy, Object websocket, EventMethods events)
{
super(policy,websocket);
this.events = events;
WebSocket anno = websocket.getClass().getAnnotation(WebSocket.class);
// Setup the policy
if (anno.maxBufferSize() > 0)
{
this.policy.setBufferSize(anno.maxBufferSize());
}
if (anno.maxBinarySize() > 0)
{
this.policy.setMaxBinaryMessageSize(anno.maxBinarySize());
}
if (anno.maxTextSize() > 0)
{
this.policy.setMaxTextMessageSize(anno.maxTextSize());
}
if (anno.maxIdleTime() > 0)
{
this.policy.setIdleTimeout(anno.maxIdleTime());
}
}
@Override
public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (events.onBinary == null)
{
// not interested in binary events
return;
}
if (activeMessage == null)
{
if (events.onBinary.isStreaming())
{
activeMessage = new MessageInputStream(this);
}
else
{
activeMessage = new SimpleBinaryMessage(this);
}
}
activeMessage.appendMessage(buffer);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
}
@Override
public void onBinaryMessage(byte[] data)
{
if (events.onBinary != null)
{
events.onBinary.call(websocket,session,data,0,data.length);
}
}
@Override
public void onClose(CloseInfo close)
{
if (events.onClose != null)
{
events.onClose.call(websocket,session,close.getStatusCode(),close.getReason());
}
}
@Override
public void onConnect()
{
if (events.onConnect != null)
{
events.onConnect.call(websocket,session);
}
}
@Override
public void onException(WebSocketException e)
{
if (events.onException != null)
{
events.onException.call(websocket,session,e);
}
}
@Override
public void onFrame(WebSocketFrame frame)
{
if (events.onFrame != null)
{
events.onFrame.call(websocket,session,frame);
}
}
public void onInputStream(InputStream stream)
{
if (events.onBinary != null)
{
events.onBinary.call(websocket,session,stream);
}
}
public void onReader(Reader reader)
{
if (events.onText != null)
{
events.onText.call(websocket,session,reader);
}
}
@Override
public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (events.onText == null)
{
// not interested in text events
return;
}
if (activeMessage == null)
{
if (events.onText.isStreaming())
{
activeMessage = new MessageReader(this);
}
else
{
activeMessage = new SimpleTextMessage(this);
}
}
activeMessage.appendMessage(buffer);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
}
@Override
public void onTextMessage(String message)
{
if (events.onText != null)
{
events.onText.call(websocket,session,message);
}
}
}

View File

@ -0,0 +1,217 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io.event;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.BaseConnection;
import org.eclipse.jetty.websocket.core.api.CloseException;
import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
*/
public abstract class EventDriver implements IncomingFrames
{
protected final Logger LOG;
protected final WebSocketPolicy policy;
protected final Object websocket;
protected WebSocketSession session;
public EventDriver(WebSocketPolicy policy, Object websocket)
{
this.policy = policy;
this.websocket = websocket;
this.LOG = Log.getLogger(websocket.getClass());
}
public WebSocketPolicy getPolicy()
{
return policy;
}
public WebSocketSession getSession()
{
return session;
}
@Override
public final void incoming(WebSocketException e)
{
if (LOG.isDebugEnabled())
{
LOG.debug("incoming({})",e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
}
onException(e);
}
@Override
public final void incoming(WebSocketFrame frame)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
}
onFrame(frame);
try
{
switch (frame.getOpCode())
{
case OpCode.CLOSE:
{
boolean validate = true;
CloseInfo close = new CloseInfo(frame,validate);
onClose(close);
// Is this close frame a response to a prior close?
if (session.getState() == BaseConnection.State.CLOSING)
{
// Then this is close response handshake (to a prior
// outgoing close frame)
session.disconnect();
}
else
{
// This is the initiator for a close handshake
// Trigger close response handshake.
session.notifyClosing();
session.close(close.getStatusCode(),close.getReason());
}
return;
}
case OpCode.PING:
{
WebSocketFrame pong = new WebSocketFrame(OpCode.PONG);
if (frame.getPayloadLength() > 0)
{
// Copy payload
ByteBuffer pongBuf = ByteBuffer.allocate(frame.getPayloadLength());
BufferUtil.clearToFill(pongBuf);
BufferUtil.put(frame.getPayload(),pongBuf);
BufferUtil.flipToFlush(pongBuf,0);
pong.setPayload(pongBuf);
if (LOG.isDebugEnabled())
{
LOG.debug("Pong with {}",BufferUtil.toDetailString(pongBuf));
}
}
session.output("pong",new FutureCallback<String>(),pong);
break;
}
case OpCode.BINARY:
{
onBinaryFrame(frame.getPayload(),frame.isFin());
return;
}
case OpCode.TEXT:
{
onTextFrame(frame.getPayload(),frame.isFin());
return;
}
}
}
catch (NotUtf8Exception e)
{
terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
}
catch (CloseException e)
{
terminateConnection(e.getStatusCode(),e.getMessage());
}
catch (Throwable t)
{
unhandled(t);
}
}
public abstract void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException;
public abstract void onBinaryMessage(byte[] data);
public abstract void onClose(CloseInfo close);
public abstract void onConnect();
public abstract void onException(WebSocketException e);
public abstract void onFrame(WebSocketFrame frame);
public abstract void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException;
public abstract void onTextMessage(String message);
public void setSession(WebSocketSession session)
{
this.session = session;
}
protected void terminateConnection(int statusCode, String rawreason)
{
String reason = rawreason;
if (StringUtil.isNotBlank(reason))
{
// Trim big exception messages here.
if (reason.length() > (WebSocketFrame.MAX_CONTROL_PAYLOAD - 2))
{
reason = reason.substring(0,WebSocketFrame.MAX_CONTROL_PAYLOAD - 2);
}
}
LOG.debug("terminateConnection({},{})",statusCode,rawreason);
session.close(statusCode,reason);
}
private void unhandled(Throwable t)
{
LOG.warn("Unhandled Error (closing connection)",t);
// Unhandled Error, close the connection.
switch (policy.getBehavior())
{
case SERVER:
terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
break;
case CLIENT:
terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
break;
}
}
}

View File

@ -16,14 +16,13 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
package org.eclipse.jetty.websocket.core.io.event;
import java.io.InputStream;
import java.io.Reader;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.util.StringUtil;
@ -34,80 +33,30 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketListener;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.protocol.Frame;
public class EventMethodsCache
/**
* Create EventDriver implementations.
*/
public class EventDriverFactory
{
@SuppressWarnings("serial")
public static class InvalidSignatureException extends InvalidWebSocketException
{
public static InvalidSignatureException build(Method method, Class<? extends Annotation> annoClass, ParamList... paramlists)
{
// Build big detailed exception to help the developer
StringBuilder err = new StringBuilder();
err.append("Invalid declaration of ");
err.append(method);
err.append(StringUtil.__LINE_SEPARATOR);
err.append("Acceptable method declarations for @");
err.append(annoClass.getSimpleName());
err.append(" are:");
for (ParamList validParams : paramlists)
{
for (Class<?>[] params : validParams)
{
err.append(StringUtil.__LINE_SEPARATOR);
err.append("public void ").append(method.getName());
err.append('(');
boolean delim = false;
for (Class<?> type : params)
{
if (delim)
{
err.append(',');
}
err.append(' ');
err.append(type.getName());
if (type.isArray())
{
err.append("[]");
}
delim = true;
}
err.append(')');
}
}
return new InvalidSignatureException(err.toString());
}
public InvalidSignatureException(String message)
{
super(message);
}
}
@SuppressWarnings("serial")
private static class ParamList extends ArrayList<Class<?>[]>
{
public void addParams(Class<?>... paramTypes)
{
this.add(paramTypes);
}
}
/**
* Parameter list for &#064;OnWebSocketMessage (Binary mode)
*/
private static final ParamList validBinaryParams;
/**
* Parameter list for &#064;OnWebSocketConnect
*/
private static final ParamList validConnectParams;
/**
* Parameter list for &#064;OnWebSocketClose
*/
private static final ParamList validCloseParams;
/**
* Parameter list for &#064;OnWebSocketFrame
*/
@ -145,10 +94,12 @@ public class EventMethodsCache
}
private ConcurrentHashMap<Class<?>, EventMethods> cache;
private final WebSocketPolicy policy;
public EventMethodsCache()
public EventDriverFactory(WebSocketPolicy policy)
{
cache = new ConcurrentHashMap<>();
this.policy = policy;
this.cache = new ConcurrentHashMap<>();
}
private void assertIsPublicNonStatic(Method method)
@ -235,11 +186,6 @@ public class EventMethodsCache
}
}
public int count()
{
return cache.size();
}
/**
* Perform the basic discovery mechanism for WebSocket events from the provided pojo.
*
@ -250,16 +196,10 @@ public class EventMethodsCache
*/
private EventMethods discoverMethods(Class<?> pojo) throws InvalidWebSocketException
{
if (WebSocketListener.class.isAssignableFrom(pojo))
{
return scanListenerMethods(pojo);
}
WebSocket anno = pojo.getAnnotation(WebSocket.class);
if (anno == null)
{
throw new InvalidWebSocketException(pojo.getName() + " does not implement " + WebSocketListener.class.getName() + " or use the @"
+ WebSocket.class.getName() + " annotation");
return null;
}
return scanAnnotatedMethods(pojo);
@ -276,20 +216,27 @@ public class EventMethodsCache
return cache.get(pojo);
}
EventMethods methods = discoverMethods(pojo);
if (methods == null)
{
return null;
}
cache.put(pojo,methods);
return methods;
}
private boolean isSameParameters(Class<?>[] actual, Class<?>[] params)
{
if(actual.length != params.length) {
if (actual.length != params.length)
{
// skip
return false;
}
int len = params.length;
for(int i=0; i<len; i++) {
if(!actual[i].equals(params[i])) {
for (int i = 0; i < len; i++)
{
if (!actual[i].equals(params[i]))
{
return false; // not valid
}
}
@ -315,23 +262,10 @@ public class EventMethodsCache
return false;
}
/**
* Register a pojo with the cache.
*
* @param pojo
* the pojo to register with the cache.
* @throws InvalidWebSocketException
* if the pojo does not conform to a WebSocket implementation.
*/
public void register(Class<?> pojo) throws InvalidWebSocketException
{
getMethods(pojo);
}
private EventMethods scanAnnotatedMethods(Class<?> pojo)
{
Class<?> clazz = pojo;
EventMethods events = new EventMethods(pojo,true);
EventMethods events = new EventMethods(pojo);
clazz = pojo;
while (clazz.getAnnotation(WebSocket.class) != null)
@ -396,23 +330,41 @@ public class EventMethodsCache
return events;
}
private EventMethods scanListenerMethods(Class<?> pojo)
{
EventMethods events = new EventMethods(pojo,false);
// This is a WebSocketListener object
events.onConnect = new EventMethod(pojo,"onWebSocketConnect",WebSocketConnection.class);
events.onClose = new EventMethod(pojo,"onWebSocketClose",int.class,String.class);
events.onBinary = new EventMethod(pojo,"onWebSocketBinary",byte[].class,int.class,int.class);
events.onText = new EventMethod(pojo,"onWebSocketText",String.class);
events.onException = new EventMethod(pojo,"onWebSocketException",WebSocketException.class);
return events;
}
@Override
public String toString()
{
return String.format("EventMethodsCache [cache.count=%d]",cache.size());
}
/**
* Wrap the given WebSocket object instance in a suitable EventDriver
*
* @param websocket
* the websocket instance to wrap. Must either implement {@link WebSocketListener} or be annotated with {@link WebSocket &#064WebSocket}
* @return appropriate EventDriver for this websocket instance.
*/
public EventDriver wrap(Object websocket)
{
if (websocket == null)
{
throw new InvalidWebSocketException("null websocket object");
}
if (websocket instanceof WebSocketListener)
{
WebSocketPolicy pojoPolicy = policy.clonePolicy();
WebSocketListener listener = (WebSocketListener)websocket;
return new ListenerEventDriver(pojoPolicy,listener);
}
EventMethods methods = getMethods(websocket.getClass());
if (methods != null)
{
WebSocketPolicy pojoPolicy = policy.clonePolicy();
return new AnnotatedEventDriver(pojoPolicy,websocket,methods);
}
throw new InvalidWebSocketException(websocket.getClass().getName() + " does not implement " + WebSocketListener.class.getName()
+ " or declare @WebSocket");
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
package org.eclipse.jetty.websocket.core.io.event;
import java.io.InputStream;
import java.io.Reader;
@ -96,6 +96,7 @@ public class EventMethod
throw new IllegalArgumentException("Call arguments length [" + args.length + "] must always be greater than or equal to captured args length ["
+ paramTypes.length + "]");
}
try
{
this.method.invoke(obj,args);
@ -126,13 +127,15 @@ public class EventMethod
{
return;
}
for(Class<?> paramType: paramTypes)
for (Class<?> paramType : paramTypes)
{
if(WebSocketConnection.class.isAssignableFrom(paramType)) {
if (WebSocketConnection.class.isAssignableFrom(paramType))
{
this.hasConnection = true;
}
if(Reader.class.isAssignableFrom(paramType)||
InputStream.class.isAssignableFrom(paramType)) {
if (Reader.class.isAssignableFrom(paramType) || InputStream.class.isAssignableFrom(paramType))
{
this.isStreaming = true;
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
package org.eclipse.jetty.websocket.core.io.event;
/**
* A representation of the methods available to call for a particular class.
@ -26,20 +26,16 @@ package org.eclipse.jetty.websocket.core.driver;
public class EventMethods
{
private Class<?> pojoClass;
private boolean isAnnotated = false;
public EventMethod onConnect = null;
public EventMethod onClose = null;
public EventMethod onBinary = null;
public EventMethod onBinaryStream = null;
public EventMethod onText = null;
public EventMethod onTextStream = null;
public EventMethod onException = null;
public EventMethod onFrame = null;
public EventMethods(Class<?> pojoClass, boolean annotated)
public EventMethods(Class<?> pojoClass)
{
this.pojoClass = pojoClass;
this.isAnnotated = annotated;
}
@Override
@ -86,19 +82,12 @@ public class EventMethods
return result;
}
public boolean isAnnotated()
{
return isAnnotated;
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("EventMethods [pojoClass=");
builder.append(pojoClass);
builder.append(", isAnnotated=");
builder.append(isAnnotated);
builder.append(", onConnect=");
builder.append(onConnect);
builder.append(", onClose=");

View File

@ -0,0 +1,73 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io.event;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.api.InvalidWebSocketException;
@SuppressWarnings("serial")
public class InvalidSignatureException extends InvalidWebSocketException
{
public static InvalidSignatureException build(Method method, Class<? extends Annotation> annoClass, ParamList... paramlists)
{
// Build big detailed exception to help the developer
StringBuilder err = new StringBuilder();
err.append("Invalid declaration of ");
err.append(method);
err.append(StringUtil.__LINE_SEPARATOR);
err.append("Acceptable method declarations for @");
err.append(annoClass.getSimpleName());
err.append(" are:");
for (ParamList validParams : paramlists)
{
for (Class<?>[] params : validParams)
{
err.append(StringUtil.__LINE_SEPARATOR);
err.append("public void ").append(method.getName());
err.append('(');
boolean delim = false;
for (Class<?> type : params)
{
if (delim)
{
err.append(',');
}
err.append(' ');
err.append(type.getName());
if (type.isArray())
{
err.append("[]");
}
delim = true;
}
err.append(')');
}
}
return new InvalidSignatureException(err.toString());
}
public InvalidSignatureException(String message)
{
super(message);
}
}

View File

@ -0,0 +1,119 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io.event;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketListener;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.message.MessageAppender;
import org.eclipse.jetty.websocket.core.io.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.core.io.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Handler for {@link WebSocketListener} based User WebSocket implementations.
*/
public class ListenerEventDriver extends EventDriver
{
private final WebSocketListener listener;
private MessageAppender activeMessage;
public ListenerEventDriver(WebSocketPolicy policy, WebSocketListener listener)
{
super(policy,listener);
this.listener = listener;
}
@Override
public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (activeMessage == null)
{
activeMessage = new SimpleBinaryMessage(this);
}
activeMessage.appendMessage(buffer);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
}
@Override
public void onBinaryMessage(byte[] data)
{
listener.onWebSocketBinary(data,0,data.length);
}
@Override
public void onClose(CloseInfo close)
{
int statusCode = close.getStatusCode();
String reason = close.getReason();
listener.onWebSocketClose(statusCode,reason);
}
@Override
public void onConnect()
{
LOG.debug("onConnect()");
listener.onWebSocketConnect(session);
}
@Override
public void onException(WebSocketException e)
{
listener.onWebSocketException(e);
}
@Override
public void onFrame(WebSocketFrame frame)
{
/* ignore, not supported by WebSocketListener */
}
@Override
public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (activeMessage == null)
{
activeMessage = new SimpleTextMessage(this);
}
activeMessage.appendMessage(buffer);
if (fin)
{
activeMessage.messageComplete();
activeMessage = null;
}
}
@Override
public void onTextMessage(String message)
{
listener.onWebSocketText(message);
}
}

View File

@ -0,0 +1,33 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io.event;
import java.util.ArrayList;
/**
* Simple class for representing a list of class arrays.
*/
@SuppressWarnings("serial")
public class ParamList extends ArrayList<Class<?>[]>
{
public void addParams(Class<?>... paramTypes)
{
this.add(paramTypes);
}
}

View File

@ -22,11 +22,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethod;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.AnnotatedEventDriver;
/**
* Support class for reading binary message data as an InputStream.
@ -38,25 +35,17 @@ public class MessageInputStream extends InputStream implements MessageAppender
* Threshold (of bytes) to perform compaction at
*/
private static final int COMPACT_THRESHOLD = 5;
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final AnnotatedEventDriver driver;
private final ByteBuffer buf;
private int size;
private boolean finished;
private boolean needsNotification;
private int readPosition;
public MessageInputStream(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy)
public MessageInputStream(AnnotatedEventDriver driver)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.bufferPool = bufferPool;
this.policy = policy;
this.buf = bufferPool.acquire(BUFFER_SIZE,false);
this.driver = driver;
this.buf = ByteBuffer.allocate(BUFFER_SIZE);
BufferUtil.clearToFill(this.buf);
size = 0;
readPosition = this.buf.position();
@ -78,7 +67,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
return;
}
policy.assertValidBinaryMessageSize(size + payload.remaining());
driver.getPolicy().assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
synchronized (buf)
@ -92,7 +81,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
if (needsNotification)
{
needsNotification = true;
this.onEvent.call(websocket,session,this);
this.driver.onInputStream(this);
}
}
@ -101,7 +90,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
{
finished = true;
super.close();
this.bufferPool.release(this.buf);
}
@Override

View File

@ -23,9 +23,7 @@ import java.io.Reader;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethod;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.AnnotatedEventDriver;
/**
* Support class for reading text message data as an Reader.
@ -34,21 +32,15 @@ import org.eclipse.jetty.websocket.core.io.WebSocketSession;
*/
public class MessageReader extends Reader implements MessageAppender
{
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final WebSocketPolicy policy;
private final AnnotatedEventDriver driver;
private final Utf8StringBuilder utf;
private int size;
private boolean finished;
private boolean needsNotification;
public MessageReader(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
public MessageReader(AnnotatedEventDriver driver)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.policy = policy;
this.driver = driver;
this.utf = new Utf8StringBuilder();
size = 0;
finished = false;
@ -69,7 +61,7 @@ public class MessageReader extends Reader implements MessageAppender
return;
}
policy.assertValidTextMessageSize(size + payload.remaining());
driver.getPolicy().assertValidTextMessageSize(size + payload.remaining());
size += payload.remaining();
synchronized (utf)
@ -80,7 +72,7 @@ public class MessageReader extends Reader implements MessageAppender
if (needsNotification)
{
needsNotification = true;
this.onEvent.call(websocket,session,this);
this.driver.onReader(this);
}
}

View File

@ -23,27 +23,19 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethod;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
public class SimpleBinaryMessage implements MessageAppender
{
private static final int BUFFER_SIZE = 65535;
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final WebSocketPolicy policy;
private final EventDriver onEvent;
private final ByteArrayOutputStream out;
private int size;
private boolean finished;
public SimpleBinaryMessage(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
public SimpleBinaryMessage(EventDriver onEvent)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.policy = policy;
this.out = new ByteArrayOutputStream(BUFFER_SIZE);
finished = false;
}
@ -62,7 +54,7 @@ public class SimpleBinaryMessage implements MessageAppender
return;
}
policy.assertValidBinaryMessageSize(size + payload.remaining());
onEvent.getPolicy().assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
BufferUtil.writeTo(payload,out);
@ -73,6 +65,6 @@ public class SimpleBinaryMessage implements MessageAppender
{
finished = true;
byte data[] = out.toByteArray();
this.onEvent.call(websocket,session,data,0,data.length);
onEvent.onBinaryMessage(data);
}
}

View File

@ -22,26 +22,18 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethod;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
public class SimpleTextMessage implements MessageAppender
{
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final WebSocketPolicy policy;
private final EventDriver onEvent;
private final Utf8StringBuilder utf;
private int size = 0;
private boolean finished;
public SimpleTextMessage(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
public SimpleTextMessage(EventDriver onEvent)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.policy = policy;
this.utf = new Utf8StringBuilder();
size = 0;
finished = false;
@ -61,7 +53,7 @@ public class SimpleTextMessage implements MessageAppender
return;
}
policy.assertValidTextMessageSize(size + payload.remaining());
onEvent.getPolicy().assertValidTextMessageSize(size + payload.remaining());
size += payload.remaining();
// allow for fast fail of BAD utf (incomplete utf will trigger on messageComplete)
@ -74,6 +66,6 @@ public class SimpleTextMessage implements MessageAppender
finished = true;
// notify event
this.onEvent.call(websocket,session,utf.toString());
onEvent.onTextMessage(utf.toString());
}
}

View File

@ -18,15 +18,16 @@
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.websocket.core.driver.EventMethodsCacheTest;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriverTest;
import org.eclipse.jetty.websocket.core.io.event.EventDriverFactoryTest;
import org.eclipse.jetty.websocket.core.io.event.EventDriverTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(
{ org.eclipse.jetty.websocket.core.ab.AllTests.class, EventMethodsCacheTest.class, WebSocketEventDriverTest.class,
org.eclipse.jetty.websocket.core.extensions.AllTests.class, org.eclipse.jetty.websocket.core.protocol.AllTests.class, GeneratorParserRoundtripTest.class })
{ org.eclipse.jetty.websocket.core.ab.AllTests.class, EventDriverFactoryTest.class, EventDriverTest.class,
org.eclipse.jetty.websocket.core.extensions.AllTests.class, org.eclipse.jetty.websocket.core.protocol.AllTests.class,
GeneratorParserRoundtripTest.class })
public class AllTests
{
/* nothing to do here */

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.websocket.core.examples;
import org.eclipse.jetty.websocket.core.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
public class AdapterConnectCloseSocket extends WebSocketAdapter
{

View File

@ -23,7 +23,7 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.core.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
@WebSocket
public class AnnotatedBinaryArraySocket

View File

@ -25,7 +25,7 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.core.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
@WebSocket
public class AnnotatedBinaryStreamSocket

View File

@ -23,7 +23,7 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.core.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.core.protocol.Frame;
@WebSocket

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.core.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.core.protocol.Frame;
@WebSocket

View File

@ -23,7 +23,7 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.core.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
@WebSocket
public class AnnotatedTextSocket

View File

@ -25,7 +25,7 @@ import org.eclipse.jetty.websocket.core.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.core.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
@WebSocket
public class AnnotatedTextStreamSocket

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.core.examples;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketListener;
import org.eclipse.jetty.websocket.core.driver.EventCapture;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
public class ListenerBasicSocket implements WebSocketListener
{

View File

@ -19,8 +19,7 @@
package org.eclipse.jetty.websocket.core.io;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.protocol.OutgoingFramesCapture;
import org.junit.rules.TestName;
@ -36,7 +35,7 @@ public class LocalWebSocketSession extends WebSocketSession
setOutgoing(outgoingCapture);
}
public LocalWebSocketSession(TestName testname, WebSocketEventDriver driver)
public LocalWebSocketSession(TestName testname, EventDriver driver)
{
super(driver,new LocalWebSocketConnection(testname),WebSocketPolicy.newServerPolicy(),"testing");
this.id = testname.getMethodName();

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
package org.eclipse.jetty.websocket.core.io.event;
import java.util.ArrayList;
import java.util.regex.Pattern;

View File

@ -16,7 +16,9 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
package org.eclipse.jetty.websocket.core.io.event;
import static org.hamcrest.Matchers.*;
import org.eclipse.jetty.websocket.core.annotations.BadBinarySignatureSocket;
import org.eclipse.jetty.websocket.core.annotations.BadDuplicateBinarySocket;
@ -31,9 +33,7 @@ import org.eclipse.jetty.websocket.core.annotations.NotASocket;
import org.eclipse.jetty.websocket.core.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketListener;
import org.eclipse.jetty.websocket.core.driver.EventMethod;
import org.eclipse.jetty.websocket.core.driver.EventMethods;
import org.eclipse.jetty.websocket.core.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.examples.AdapterConnectCloseSocket;
import org.eclipse.jetty.websocket.core.examples.AnnotatedBinaryArraySocket;
import org.eclipse.jetty.websocket.core.examples.AnnotatedBinaryStreamSocket;
@ -43,12 +43,7 @@ import org.eclipse.jetty.websocket.core.examples.ListenerBasicSocket;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class EventMethodsCacheTest
public class EventDriverFactoryTest
{
private void assertHasEventMethod(String message, EventMethod actual)
{
@ -69,24 +64,12 @@ public class EventMethodsCacheTest
@Test
public void testAdapterConnectCloseSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(AdapterConnectCloseSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
AdapterConnectCloseSocket socket = new AdapterConnectCloseSocket();
EventDriver driver = factory.wrap(socket);
String classId = AdapterConnectCloseSocket.class.getSimpleName();
Assert.assertThat("EventMethods for " + classId,methods,notNullValue());
// Directly Declared
assertHasEventMethod(classId + ".onClose",methods.onClose);
assertHasEventMethod(classId + ".onConnect",methods.onConnect);
// From WebSocketAdapter
assertHasEventMethod(classId + ".onBinary",methods.onBinary);
assertHasEventMethod(classId + ".onException",methods.onException);
assertHasEventMethod(classId + ".onText",methods.onText);
// Advanced, only available from @OnWebSocketFrame annotation
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
Assert.assertThat("EventDriver for " + classId,driver,instanceOf(ListenerEventDriver.class));
}
/**
@ -95,11 +78,11 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedBadDuplicateBinarySocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
try
{
// Should toss exception
cache.getMethods(BadDuplicateBinarySocket.class);
factory.getMethods(BadDuplicateBinarySocket.class);
Assert.fail("Should have thrown " + InvalidWebSocketException.class);
}
catch (InvalidWebSocketException e)
@ -115,11 +98,11 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedBadDuplicateFrameSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
try
{
// Should toss exception
cache.getMethods(BadDuplicateFrameSocket.class);
factory.getMethods(BadDuplicateFrameSocket.class);
Assert.fail("Should have thrown " + InvalidWebSocketException.class);
}
catch (InvalidWebSocketException e)
@ -135,11 +118,11 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedBadSignature_NonVoidReturn()
{
EventMethodsCache cache = new EventMethodsCache();
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
try
{
// Should toss exception
cache.getMethods(BadBinarySignatureSocket.class);
factory.getMethods(BadBinarySignatureSocket.class);
Assert.fail("Should have thrown " + InvalidWebSocketException.class);
}
catch (InvalidWebSocketException e)
@ -155,11 +138,11 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedBadSignature_Static()
{
EventMethodsCache cache = new EventMethodsCache();
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
try
{
// Should toss exception
cache.getMethods(BadTextSignatureSocket.class);
factory.getMethods(BadTextSignatureSocket.class);
Assert.fail("Should have thrown " + InvalidWebSocketException.class);
}
catch (InvalidWebSocketException e)
@ -175,8 +158,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedBinaryArraySocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(AnnotatedBinaryArraySocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(AnnotatedBinaryArraySocket.class);
String classId = AnnotatedBinaryArraySocket.class.getSimpleName();
@ -199,8 +182,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedBinaryStreamSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(AnnotatedBinaryStreamSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(AnnotatedBinaryStreamSocket.class);
String classId = AnnotatedBinaryStreamSocket.class.getSimpleName();
@ -223,8 +206,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedMyEchoBinarySocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(MyEchoBinarySocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(MyEchoBinarySocket.class);
String classId = MyEchoBinarySocket.class.getSimpleName();
@ -244,8 +227,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedMyEchoSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(MyEchoSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(MyEchoSocket.class);
String classId = MyEchoSocket.class.getSimpleName();
@ -265,8 +248,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedMyStatelessEchoSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(MyStatelessEchoSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(MyStatelessEchoSocket.class);
String classId = MyStatelessEchoSocket.class.getSimpleName();
@ -289,8 +272,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedNoop()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(NoopSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(NoopSocket.class);
String classId = NoopSocket.class.getSimpleName();
@ -310,8 +293,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedOnFrame()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(FrameSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(FrameSocket.class);
String classId = FrameSocket.class.getSimpleName();
@ -331,8 +314,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedTextSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(AnnotatedTextSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(AnnotatedTextSocket.class);
String classId = AnnotatedTextSocket.class.getSimpleName();
@ -355,8 +338,8 @@ public class EventMethodsCacheTest
@Test
public void testAnnotatedTextStreamSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(AnnotatedTextStreamSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
EventMethods methods = factory.getMethods(AnnotatedTextStreamSocket.class);
String classId = AnnotatedTextStreamSocket.class.getSimpleName();
@ -379,11 +362,12 @@ public class EventMethodsCacheTest
@Test
public void testBadNotASocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
try
{
NotASocket bad = new NotASocket();
// Should toss exception
cache.getMethods(NotASocket.class);
factory.wrap(bad);
}
catch (InvalidWebSocketException e)
{
@ -398,18 +382,11 @@ public class EventMethodsCacheTest
@Test
public void testListenerBasicSocket()
{
EventMethodsCache cache = new EventMethodsCache();
EventMethods methods = cache.getMethods(ListenerBasicSocket.class);
EventDriverFactory factory = new EventDriverFactory(WebSocketPolicy.newClientPolicy());
ListenerBasicSocket socket = new ListenerBasicSocket();
EventDriver driver = factory.wrap(socket);
String classId = AdapterConnectCloseSocket.class.getSimpleName();
Assert.assertThat("ListenerBasicSocket for " + classId,methods,notNullValue());
assertHasEventMethod(classId + ".onClose",methods.onClose);
assertHasEventMethod(classId + ".onConnect",methods.onConnect);
assertHasEventMethod(classId + ".onBinary",methods.onBinary);
assertHasEventMethod(classId + ".onException",methods.onException);
assertHasEventMethod(classId + ".onText",methods.onText);
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
String classId = ListenerBasicSocket.class.getSimpleName();
Assert.assertThat("EventDriver for " + classId,driver,instanceOf(ListenerEventDriver.class));
}
}

View File

@ -16,14 +16,10 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.driver;
package org.eclipse.jetty.websocket.core.io.event;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.examples.AdapterConnectCloseSocket;
import org.eclipse.jetty.websocket.core.examples.AnnotatedBinaryArraySocket;
import org.eclipse.jetty.websocket.core.examples.AnnotatedBinaryStreamSocket;
@ -37,7 +33,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class WebSocketEventDriverTest
public class EventDriverTest
{
@Rule
public TestName testname = new TestName();
@ -47,20 +43,11 @@ public class WebSocketEventDriverTest
return WebSocketFrame.binary().setFin(fin).setPayload(content);
}
private WebSocketEventDriver newDriver(Object websocket)
{
EventMethodsCache methodsCache = new EventMethodsCache();
methodsCache.register(websocket.getClass());
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
ByteBufferPool bufferPool = new MappedByteBufferPool();
return new WebSocketEventDriver(websocket,methodsCache,policy,bufferPool);
}
@Test
public void testAdapter_ConnectClose()
{
AdapterConnectCloseSocket socket = new AdapterConnectCloseSocket();
WebSocketEventDriver driver = newDriver(socket);
EventDriver driver = wrap(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setSession(conn);
@ -76,7 +63,7 @@ public class WebSocketEventDriverTest
public void testAnnotated_ByteArray()
{
AnnotatedBinaryArraySocket socket = new AnnotatedBinaryArraySocket();
WebSocketEventDriver driver = newDriver(socket);
EventDriver driver = wrap(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setSession(conn);
@ -94,7 +81,7 @@ public class WebSocketEventDriverTest
public void testAnnotated_Frames()
{
AnnotatedFramesSocket socket = new AnnotatedFramesSocket();
WebSocketEventDriver driver = newDriver(socket);
EventDriver driver = wrap(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setSession(conn);
@ -117,7 +104,7 @@ public class WebSocketEventDriverTest
public void testAnnotated_InputStream()
{
AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket();
WebSocketEventDriver driver = newDriver(socket);
EventDriver driver = wrap(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setSession(conn);
@ -135,7 +122,7 @@ public class WebSocketEventDriverTest
public void testListener_Text()
{
ListenerBasicSocket socket = new ListenerBasicSocket();
WebSocketEventDriver driver = newDriver(socket);
EventDriver driver = wrap(socket);
LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setSession(conn);
@ -148,4 +135,11 @@ public class WebSocketEventDriverTest
socket.capture.assertEventStartsWith(1,"onWebSocketText(\"Hello World\")");
socket.capture.assertEventStartsWith(2,"onWebSocketClose(1000,");
}
private EventDriver wrap(Object websocket)
{
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
EventDriverFactory factory = new EventDriverFactory(policy);
return factory.wrap(websocket);
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -44,7 +45,7 @@ public abstract class WebSocketHandler extends HandlerWrapper
}
@Override
public void registerWebSockets(WebSocketServerFactory factory)
public void configure(WebSocketServerFactory factory)
{
factory.register(websocketPojo);
}
@ -59,6 +60,8 @@ public abstract class WebSocketHandler extends HandlerWrapper
webSocketFactory = new WebSocketServerFactory(policy);
}
public abstract void configure(WebSocketServerFactory factory);
public void configurePolicy(WebSocketPolicy policy)
{
/* leave at default */
@ -68,7 +71,7 @@ public abstract class WebSocketHandler extends HandlerWrapper
protected void doStart() throws Exception
{
super.doStart();
registerWebSockets(webSocketFactory);
configure(webSocketFactory);
}
public WebSocketServerFactory getWebSocketFactory()
@ -98,6 +101,4 @@ public abstract class WebSocketHandler extends HandlerWrapper
}
super.handle(target,baseRequest,request,response);
}
public abstract void registerWebSockets(WebSocketServerFactory factory);
}

View File

@ -49,12 +49,12 @@ import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.core.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.core.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
@ -79,11 +79,10 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
private final Scheduler scheduler = new TimerScheduler();
private final String supportedVersions;
private final WebSocketPolicy basePolicy;
private final EventMethodsCache methodsCache;
private final ByteBufferPool bufferPool;
private final EventDriverFactory eventDriverFactory;
private final WebSocketExtensionRegistry extensionRegistry;
private WebSocketCreator creator;
private Class<?> firstRegisteredClass;
private List<Class<?>> registeredSocketClasses;
public WebSocketServerFactory(WebSocketPolicy policy)
{
@ -95,9 +94,10 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
addBean(scheduler);
addBean(bufferPool);
this.registeredSocketClasses = new ArrayList<>();
this.basePolicy = policy;
this.methodsCache = new EventMethodsCache();
this.bufferPool = bufferPool;
this.eventDriverFactory = new EventDriverFactory(basePolicy);
this.extensionRegistry = new WebSocketExtensionRegistry(basePolicy,bufferPool);
this.creator = this;
@ -143,9 +143,8 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
}
// Send the upgrade
WebSocketPolicy objPolicy = this.basePolicy.clonePolicy();
WebSocketEventDriver websocket = new WebSocketEventDriver(websocketPojo,methodsCache,objPolicy,bufferPool);
return upgrade(sockreq,sockresp,websocket);
EventDriver driver = eventDriverFactory.wrap(websocketPojo);
return upgrade(sockreq,sockresp,driver);
}
protected void closeConnections()
@ -157,26 +156,30 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
sessions.clear();
}
/**
* Default Creator logic
*/
@Override
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
{
if (methodsCache.count() < 1)
if (registeredSocketClasses.size() < 1)
{
throw new WebSocketException("No WebSockets have been registered with the factory. Cannot use default implementation of WebSocketCreator.");
}
if (methodsCache.count() > 1)
if (registeredSocketClasses.size() > 1)
{
LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
}
Class<?> firstClass = registeredSocketClasses.get(0);
try
{
return firstRegisteredClass.newInstance();
return firstClass.newInstance();
}
catch (InstantiationException | IllegalAccessException e)
{
throw new WebSocketException("Unable to create instance of " + firstRegisteredClass,e);
throw new WebSocketException("Unable to create instance of " + firstClass,e);
}
}
@ -272,13 +275,17 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
return protocols;
}
public void register(Class<?> websocketClass)
/**
* Register a websocket class pojo with the default {@link WebSocketCreator}.
* <p>
* Note: only required if using the default {@link WebSocketCreator} provided by this factory.
*
* @param websocketPojo
* the class to instantiate for each incoming websocket upgrade request.
*/
public void register(Class<?> websocketPojo)
{
if (firstRegisteredClass == null)
{
firstRegisteredClass = websocketClass;
}
methodsCache.register(websocketClass);
registeredSocketClasses.add(websocketPojo);
}
public boolean sessionClosed(WebSocketSession session)
@ -317,12 +324,12 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
* The request to upgrade
* @param response
* The response to upgrade
* @param websocket
* @param driver
* The websocket handler implementation to use
* @throws IOException
* in case of I/O errors
*/
public boolean upgrade(ServletWebSocketRequest request, ServletWebSocketResponse response, WebSocketEventDriver websocket) throws IOException
public boolean upgrade(ServletWebSocketRequest request, ServletWebSocketResponse response, EventDriver driver) throws IOException
{
if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
{
@ -356,7 +363,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
WebSocketServerConnection connection = new WebSocketServerConnection(endp,executor,scheduler,websocket.getPolicy(),bufferPool,this);
WebSocketServerConnection connection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
// Tell jetty about the new connection
request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
@ -364,7 +371,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
LOG.debug("AsyncWebSocketConnection: {}",connection);
// Initialize / Negotiate Extensions
WebSocketSession session = new WebSocketSession(websocket,connection,getPolicy(),response.getAcceptedSubProtocol());
WebSocketSession session = new WebSocketSession(driver,connection,getPolicy(),response.getAcceptedSubProtocol());
connection.setSession(session);
List<Extension> extensions = initExtensions(request.getExtensions());
request.setValidExtensions(extensions);

View File

@ -23,8 +23,8 @@ import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(
{ org.eclipse.jetty.websocket.server.ab.AllTests.class, DeflateExtensionTest.class, FragmentExtensionTest.class, IdentityExtensionTest.class, LoadTest.class,
WebSocketInvalidVersionTest.class, WebSocketLoadRFC6455Test.class, WebSocketOverSSLTest.class, WebSocketServletRFCTest.class })
{ org.eclipse.jetty.websocket.server.ab.AllTests.class, ChromeTest.class, DeflateExtensionTest.class, FragmentExtensionTest.class, IdentityExtensionTest.class,
LoadTest.class, WebSocketInvalidVersionTest.class, WebSocketLoadRFC6455Test.class, WebSocketOverSSLTest.class, WebSocketServletRFCTest.class })
public class AllTests
{
/* let junit do the rest */

View File

@ -18,8 +18,8 @@
package org.eclipse.jetty.websocket.server.examples.echo;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -99,9 +99,8 @@ public class ExampleEchoServer
wsHandler = new WebSocketHandler()
{
@Override
public void registerWebSockets(WebSocketServerFactory factory)
public void configure(WebSocketServerFactory factory)
{
factory.register(BigEchoSocket.class);
factory.setCreator(new EchoCreator());
}
};

View File

@ -172,7 +172,6 @@
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>