Issue #207 - Support javax.websocket version 1.1

+ Cleaning up WebSocketPolicy usage
This commit is contained in:
Joakim Erdfelt 2016-08-23 16:57:09 -07:00
parent 9c1ac5fedd
commit 0a99c20a73
24 changed files with 217 additions and 185 deletions

View File

@ -95,8 +95,6 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
client.setSessionFactory(new JsrSessionFactory(this));
addBean(client);
// annotatedConfigFunctions.add(new ClientEndpointConfigFunction());
ShutdownThread.register(this);
}

View File

@ -42,6 +42,7 @@ import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
@ -70,9 +71,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote;
public JsrSession(ClientContainer container, String id, URI requestURI, Object websocket, LogicalConnection connection)
public JsrSession(ClientContainer container, String id, URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection)
{
super(container, requestURI, websocket, connection);
super(container, requestURI, websocket, policy, connection);
this.container = container;

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.jsr356;
import java.net.URI;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.WebSocketSession;
@ -34,9 +35,9 @@ public class JsrSessionFactory implements SessionFactory
}
@Override
public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
public WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection)
{
return new JsrSession(container,connection.getId(),requestURI,websocket,connection);
return new JsrSession(container,connection.getId(),requestURI,websocket,policy,connection);
}
@Override

View File

@ -600,6 +600,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
return null;
}, onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
StringBuilder err = new StringBuilder();
err.append("@OnMessage.maxMessageSize=").append(annotation.maxMessageSize());
err.append(" not valid for PongMesssage types: ");
ReflectUtils.append(err,onMsg);
LOG.warn(err.toString());
}
return true;
}
return false;
@ -629,6 +638,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
);
setOnBinary(streamSink, onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
StringBuilder err = new StringBuilder();
err.append("@OnMessage.maxMessageSize=").append(annotation.maxMessageSize());
err.append(" not valid for Streaming Binary types: ");
ReflectUtils.append(err,onMsg);
LOG.warn(err.toString());
}
return true;
}
}
@ -659,6 +677,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
);
setOnText(streamSink, onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
StringBuilder err = new StringBuilder();
err.append("@OnMessage.maxMessageSize=").append(annotation.maxMessageSize());
err.append(" not valid for Streaming Text types: ");
ReflectUtils.append(err,onMsg);
LOG.warn(err.toString());
}
return true;
}
}
@ -688,6 +715,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
invoker.apply(endpoint, args);
return null;
}), onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
StringBuilder err = new StringBuilder();
err.append("@OnMessage.maxMessageSize=").append(annotation.maxMessageSize());
err.append(" not valid for Partial Binary Buffer types: ");
ReflectUtils.append(err,onMsg);
LOG.warn(err.toString());
}
return true;
}
return false;
@ -715,6 +751,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
invoker.apply(endpoint, args);
return null;
}), onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
StringBuilder err = new StringBuilder();
err.append("@OnMessage.maxMessageSize=").append(annotation.maxMessageSize());
err.append(" not valid for Partial Binary Array types: ");
ReflectUtils.append(err,onMsg);
LOG.warn(err.toString());
}
return true;
}
return false;
@ -743,6 +788,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
invoker.apply(endpoint, args);
return null;
}), onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
StringBuilder err = new StringBuilder();
err.append("@OnMessage.maxMessageSize=").append(annotation.maxMessageSize());
err.append(" not valid for Partial Text types: ");
ReflectUtils.append(err,onMsg);
LOG.warn(err.toString());
}
return true;
}
return false;
@ -773,6 +827,11 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
);
setOnBinary(binarySink, onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
policy.setMaxBinaryMessageSize(annotation.maxMessageSize());
}
return true;
}
}
@ -804,6 +863,11 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
);
setOnText(textSink, onMsg);
OnMessage annotation = onMsg.getAnnotation(OnMessage.class);
if (annotation.maxMessageSize() > 0)
{
policy.setMaxTextMessageSize(annotation.maxMessageSize());
}
return true;
}
}

View File

@ -101,7 +101,6 @@ public class OnCloseTest
public OnCloseTest(Case testcase)
{
this.testcase = testcase;
System.err.printf("Testing @OnClose for %s%n", testcase.closeClass.getName());
}
@Test

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.websocket.api;
import java.util.ArrayList;
import java.util.List;
/**
* Settings for WebSocket operations.
*/
@ -107,13 +104,6 @@ public class WebSocketPolicy
*/
private final WebSocketBehavior behavior;
public static interface PolicyUpdate
{
public void onPolicyUpdate(WebSocketPolicy policy);
}
private List<PolicyUpdate> listeners = new ArrayList<>();
public WebSocketPolicy(WebSocketBehavior behavior)
{
this.behavior = behavior;
@ -173,24 +163,6 @@ public class WebSocketPolicy
return clone;
}
public void addListener(PolicyUpdate update)
{
this.listeners.add(update);
}
public void removeListener(PolicyUpdate update)
{
this.listeners.remove(update);
}
private void notifyOfUpdate()
{
for(PolicyUpdate update: listeners)
{
update.onPolicyUpdate(this);
}
}
/**
* The timeout in ms (milliseconds) for async write operations.
* <p>
@ -284,11 +256,8 @@ public class WebSocketPolicy
*/
public void setAsyncWriteTimeout(long ms)
{
boolean dirty = (this.asyncWriteTimeout != ms);
assertLessThan("AsyncWriteTimeout",ms,"IdleTimeout",idleTimeout);
this.asyncWriteTimeout = ms;
if (dirty)
notifyOfUpdate();
}
/**
@ -301,11 +270,8 @@ public class WebSocketPolicy
{
if(ms < -1) return; // no change (likely came from annotation)
boolean dirty = (this.idleTimeout != ms);
assertGreaterThan("IdleTimeout",ms,0);
this.idleTimeout = ms;
if (dirty)
notifyOfUpdate();
}
/**
@ -318,14 +284,11 @@ public class WebSocketPolicy
{
if(size < 0) return; // no change (likely came from annotation)
boolean dirty = (this.inputBufferSize != size);
assertGreaterThan("InputBufferSize",size,1);
assertLessThan("InputBufferSize",size,"MaxTextMessageBufferSize",maxTextMessageBufferSize);
assertLessThan("InputBufferSize",size,"MaxBinaryMessageBufferSize",maxBinaryMessageBufferSize);
this.inputBufferSize = size;
if(dirty)
notifyOfUpdate();
}
/**
@ -338,12 +301,18 @@ public class WebSocketPolicy
*/
public void setMaxBinaryMessageBufferSize(int size)
{
boolean dirty = (this.maxBinaryMessageBufferSize != size);
assertGreaterThan("MaxBinaryMessageBufferSize",size,1);
this.maxBinaryMessageBufferSize = size;
if(dirty)
notifyOfUpdate();
}
public void setMaxBinaryMessageSize(long size)
{
if (size > Integer.MAX_VALUE)
{
throw new IllegalArgumentException("This implementation does not support binary message sizes over " + Integer.MAX_VALUE);
}
this.setMaxBinaryMessageSize((int) size);
}
/**
@ -358,12 +327,9 @@ public class WebSocketPolicy
{
if(size < 0) return; // no change (likely came from annotation)
boolean dirty = (this.maxBinaryMessageSize != size);
assertGreaterThan("MaxBinaryMessageSize",size,1);
this.maxBinaryMessageSize = size;
if(dirty)
notifyOfUpdate();
}
/**
@ -376,12 +342,18 @@ public class WebSocketPolicy
*/
public void setMaxTextMessageBufferSize(int size)
{
boolean dirty = (this.maxTextMessageBufferSize != size);
assertGreaterThan("MaxTextMessageBufferSize",size,1);
this.maxTextMessageBufferSize = size;
if(dirty)
notifyOfUpdate();
}
public void setMaxTextMessageSize(long size)
{
if (size > Integer.MAX_VALUE)
{
throw new IllegalArgumentException("This implementation does not support text message sizes over " + Integer.MAX_VALUE);
}
this.setMaxTextMessageSize((int) size);
}
/**
@ -396,12 +368,9 @@ public class WebSocketPolicy
{
if(size < 0) return; // no change (likely came from annotation)
boolean dirty = (this.maxTextMessageSize != size);
assertGreaterThan("MaxTextMessageSize",size,1);
this.maxTextMessageSize = size;
if(dirty)
notifyOfUpdate();
}
@Override

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -65,7 +66,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
{
private static final Logger LOG = Log.getLogger(WebSocketClient.class);
private final WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
private final WebSocketPolicy containerPolicy;
private final SslContextFactory sslContextFactory;
private final WebSocketExtensionFactory extensionRegistry;
private boolean daemon = false;
@ -113,7 +114,27 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
public WebSocketClient(WebSocketContainerScope scope, SslContextFactory sslContextFactory)
{
this(sslContextFactory, scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
this.containerPolicy = scope.getPolicy();
this.sslContextFactory = sslContextFactory;
this.objectFactory = scope.getObjectFactory();
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.masker = new RandomMasker();
setExecutor(scope.getExecutor());
setBufferPool(scope.getBufferPool());
if (scope instanceof LifeCycle)
{
addBean(scope);
}
else
{
if (sslContextFactory != null)
addBean(sslContextFactory);
addBean(this.executor);
addBean(this.sslContextFactory);
addBean(this.bufferPool);
}
}
public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
@ -123,18 +144,17 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
{
this.containerPolicy = WebSocketPolicy.newClientPolicy();
this.sslContextFactory = sslContextFactory;
if(sslContextFactory!=null)
addBean(sslContextFactory);
this.objectFactory = objectFactory;
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.masker = new RandomMasker();
setExecutor(executor);
setBufferPool(bufferPool);
this.objectFactory = objectFactory;
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.masker = new RandomMasker();
if(sslContextFactory!=null)
addBean(sslContextFactory);
addBean(this.executor);
addBean(this.sslContextFactory);
addBean(this.bufferPool);
@ -291,7 +311,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public long getAsyncWriteTimeout()
{
return this.policy.getAsyncWriteTimeout();
return this.containerPolicy.getAsyncWriteTimeout();
}
public SocketAddress getBindAddress()
@ -341,7 +361,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public int getMaxBinaryMessageBufferSize()
{
return this.policy.getMaxBinaryMessageBufferSize();
return this.containerPolicy.getMaxBinaryMessageBufferSize();
}
/**
@ -351,7 +371,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public long getMaxBinaryMessageSize()
{
return this.policy.getMaxBinaryMessageSize();
return this.containerPolicy.getMaxBinaryMessageSize();
}
/**
@ -361,7 +381,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public long getMaxIdleTimeout()
{
return this.policy.getIdleTimeout();
return this.containerPolicy.getIdleTimeout();
}
/**
@ -371,7 +391,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public int getMaxTextMessageBufferSize()
{
return this.policy.getMaxTextMessageBufferSize();
return this.containerPolicy.getMaxTextMessageBufferSize();
}
/**
@ -381,7 +401,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public long getMaxTextMessageSize()
{
return this.policy.getMaxTextMessageSize();
return this.containerPolicy.getMaxTextMessageSize();
}
@Override
@ -397,7 +417,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
public WebSocketPolicy getPolicy()
{
return this.policy;
return this.containerPolicy;
}
public Scheduler getScheduler()
@ -475,7 +495,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
public void setAsyncWriteTimeout(long ms)
{
this.policy.setAsyncWriteTimeout(ms);
this.containerPolicy.setAsyncWriteTimeout(ms);
}
/**
@ -543,7 +563,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
public void setMaxBinaryMessageBufferSize(int max)
{
this.policy.setMaxBinaryMessageBufferSize(max);
this.containerPolicy.setMaxBinaryMessageBufferSize(max);
}
/**
@ -556,12 +576,12 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public void setMaxIdleTimeout(long ms)
{
this.policy.setIdleTimeout(ms);
this.containerPolicy.setIdleTimeout(ms);
}
public void setMaxTextMessageBufferSize(int max)
{
this.policy.setMaxTextMessageBufferSize(max);
this.containerPolicy.setMaxTextMessageBufferSize(max);
}
public void setSessionFactory(SessionFactory sessionFactory)

View File

@ -307,14 +307,14 @@ public class UpgradeConnection extends AbstractConnection implements Connection.
Executor executor = getExecutor();
Object websocket = connectPromise.getWebSocketEndpoint();
WebSocketPolicy policy = connectPromise.getClient().getPolicy().clonePolicy();
WebSocketPolicy policy = connectPromise.getClient().getPolicy();
// Establish Connection
WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
// Create WebSocket Session
SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,policy,connection);
session.setUpgradeRequest(request);
session.setUpgradeResponse(response);

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;

View File

@ -23,7 +23,6 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.io.IOState;
@ -99,12 +98,6 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
long getMaxIdleTimeout();
/**
* The policy that the connection is running under.
* @return the policy for the connection
*/
WebSocketPolicy getPolicy();
/**
* Get the remote Address in use for this connection.
* <p>

View File

@ -20,12 +20,28 @@ package org.eclipse.jetty.websocket.common;
import java.net.URI;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
/**
* Interface for creating jetty {@link WebSocketSession} objects.
*/
public interface SessionFactory
{
public boolean supports(Object websocket);
/**
* Does this implementation support this object type
* @param websocket the object instance
* @return true if this SessionFactory supports this object type
*/
boolean supports(Object websocket);
public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection);
/**
* Create a new WebSocketSession from the provided information
*
* @param requestURI
* @param websocket
* @param policy
* @param connection
* @return
*/
WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection);
}

View File

@ -268,7 +268,11 @@ public abstract class WebSocketFrame implements Frame
return (byte)(finRsvOp & 0x80) != 0;
}
/**
* @deprecated use {@link #isFin()} instead
*/
@Override
@Deprecated
public boolean isLast()
{
return isFin();

View File

@ -69,11 +69,13 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
@ManagedObject("A Jetty WebSocket Session")
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory,
WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
{
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
private final WebSocketContainerScope containerScope;
private final WebSocketPolicy policy;
private final URI requestURI;
private final LogicalConnection connection;
private final Executor executor;
@ -93,11 +95,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private Map<String, String[]> parameterMap = new HashMap<>();
private RemoteEndpoint remote;
private OutgoingFrames outgoingHandler;
private WebSocketPolicy policy;
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, LogicalConnection connection)
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, WebSocketPolicy policy, LogicalConnection connection)
{
Objects.requireNonNull(containerScope, "Container Scope cannot be null");
Objects.requireNonNull(requestURI, "Request URI cannot be null");
@ -110,14 +111,14 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
this.executor = connection.getExecutor();
this.outgoingHandler = connection;
this.connection.getIOState().addListener(this);
this.policy = connection.getPolicy();
this.policy = policy;
addBean(this.connection);
}
public EndpointFunctions newEndpointFunctions(Object endpoint)
{
return new CommonEndpointFunctions(endpoint, this.policy, this.executor);
return new CommonEndpointFunctions(endpoint, getPolicy(), this.executor);
}
@Override
@ -312,7 +313,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override
public WebSocketPolicy getPolicy()
{
return policy;
return this.policy;
}
@Override
@ -325,7 +326,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
public RemoteEndpoint getRemote()
{
if (LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.getRemote()", policy.getBehavior(), this.getClass().getSimpleName());
LOG_OPEN.debug("[{}] {}.getRemote()", getPolicy().getBehavior(), this.getClass().getSimpleName());
ConnectionState state = connection.getIOState().getConnectionState();
if ((state == ConnectionState.OPEN) || (state == ConnectionState.CONNECTED))
@ -491,7 +492,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
notifyError(cause);
// Unhandled Error, close the connection.
switch (policy.getBehavior())
switch (getPolicy().getBehavior())
{
case SERVER:
close(StatusCode.SERVER_ERROR, cause.getClass().getSimpleName());
@ -557,7 +558,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
public void onOpened(Connection connection)
{
if (LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.onOpened()", policy.getBehavior(), this.getClass().getSimpleName());
LOG_OPEN.debug("[{}] {}.onOpened()", getPolicy().getBehavior(), this.getClass().getSimpleName());
open();
}
@ -610,7 +611,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
public void open()
{
if (LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.open()", policy.getBehavior(), this.getClass().getSimpleName());
LOG_OPEN.debug("[{}] {}.open()", getPolicy().getBehavior(), this.getClass().getSimpleName());
if (remote != null)
{
@ -626,7 +627,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
// Connect remote
remote = remoteEndpointFactory.newRemoteEndpoint(connection,outgoingHandler,getBatchMode());
if (LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.open() remote={}", policy.getBehavior(), this.getClass().getSimpleName(), remote);
LOG_OPEN.debug("[{}] {}.open() remote={}", getPolicy().getBehavior(), this.getClass().getSimpleName(), remote);
// Open WebSocket
endpointFunctions.onOpen(this);
@ -654,7 +655,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
// Exception on end-user WS-Endpoint.
// Fast-fail & close connection with reason.
int statusCode = StatusCode.SERVER_ERROR;
if (policy.getBehavior() == WebSocketBehavior.CLIENT)
if (getPolicy().getBehavior() == WebSocketBehavior.CLIENT)
{
statusCode = StatusCode.POLICY_VIOLATION;
}
@ -681,11 +682,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
this.outgoingHandler = outgoing;
}
public void setPolicy(WebSocketPolicy policy)
{
this.policy = policy;
}
public void setUpgradeRequest(UpgradeRequest request)
{
this.upgradeRequest = request;

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common;
import java.net.URI;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -43,8 +44,8 @@ public class WebSocketSessionFactory implements SessionFactory
}
@Override
public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
public WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection)
{
return new WebSocketSession(containerScope, requestURI,websocket,connection);
return new WebSocketSession(containerScope, requestURI, websocket, policy, connection);
}
}

View File

@ -44,8 +44,8 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WebSocketPolicy.PolicyUpdate;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -59,15 +59,15 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, PolicyUpdate, Dumpable
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
{
private final AtomicBoolean closed = new AtomicBoolean();
private class Flusher extends FrameFlusher
{
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
private Flusher(ByteBufferPool bufferPool, int bufferSize, Generator generator, EndPoint endpoint)
{
super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
super(bufferPool,generator,endpoint,bufferSize,8);
}
@Override
@ -194,7 +194,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
private static enum ReadMode
private enum ReadMode
{
PARSE,
DISCARD,
@ -214,7 +214,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final Scheduler scheduler;
private final Generator generator;
private final Parser parser;
private final WebSocketPolicy policy;
private final WebSocketBehavior behavior;
private final AtomicBoolean suspendToken;
private final FrameFlusher flusher;
private final String id;
@ -233,7 +233,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
endp.getLocalAddress().getPort(),
endp.getRemoteAddress().getAddress().getHostAddress(),
endp.getRemoteAddress().getPort());
this.policy = policy;
this.behavior = policy.getBehavior();
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy,bufferPool);
@ -242,15 +242,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.suspendToken = new AtomicBoolean(false);
this.ioState = new IOState();
this.ioState.addListener(this);
this.flusher = new Flusher(bufferPool,generator,endp);
this.setInputBufferSize(policy.getInputBufferSize());
this.setMaxIdleTimeout(policy.getIdleTimeout());
this.policy.addListener(this);
}
@Override
public void onPolicyUpdate(WebSocketPolicy policy)
{
this.flusher = new Flusher(bufferPool,policy.getMaxBinaryMessageBufferSize(),generator,endp);
this.setInputBufferSize(policy.getInputBufferSize());
this.setMaxIdleTimeout(policy.getIdleTimeout());
}
@ -302,14 +294,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void disconnect()
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
LOG_CLOSE.debug("{} disconnect()",behavior);
disconnect(false);
}
private void disconnect(boolean onlyOutput)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
LOG_CLOSE.debug("{} disconnect({})",behavior,onlyOutput?"outputOnly":"both");
// close FrameFlusher, we cannot write anymore at this point.
flusher.close();
EndPoint endPoint = getEndPoint();
@ -398,12 +390,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return parser;
}
@Override
public WebSocketPolicy getPolicy()
{
return this.policy;
}
@Override
public InetSocketAddress getRemoteAddress()
{
@ -441,9 +427,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onClose()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClose()",policy.getBehavior());
LOG.debug("{} onClose()",behavior);
super.onClose();
policy.removeListener(this);
ioState.onDisconnected();
flusher.close();
}
@ -452,7 +437,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onConnectionStateChange(ConnectionState state)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
LOG_CLOSE.debug("{} Connection State Change: {}",behavior,state);
switch (state)
{
@ -507,7 +492,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable()",policy.getBehavior());
LOG.debug("{} onFillable()",behavior);
stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
@ -572,7 +557,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onOpen()
{
if(LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
LOG_OPEN.debug("[{}] {}.onOpened()",behavior,this.getClass().getSimpleName());
super.onOpen();
this.ioState.onOpened();
}
@ -586,7 +571,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
IOState state = getIOState();
ConnectionState cstate = state.getConnectionState();
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
LOG_CLOSE.debug("{} Read Timeout - {}",behavior,cstate);
if (cstate == ConnectionState.CLOSED)
{

View File

@ -33,23 +33,23 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
{
private final ByteBufferPool bufferPool;
private final DecoratedObjectFactory objectFactory;
private final WebSocketPolicy policy;
private final WebSocketPolicy containerPolicy;
private Executor executor;
private SslContextFactory sslContextFactory;
public SimpleContainerScope(WebSocketPolicy policy)
public SimpleContainerScope(WebSocketPolicy containerPolicy)
{
this(policy, new MappedByteBufferPool(), new DecoratedObjectFactory());
this(containerPolicy, new MappedByteBufferPool(), new DecoratedObjectFactory());
}
public SimpleContainerScope(WebSocketPolicy policy, ByteBufferPool bufferPool)
public SimpleContainerScope(WebSocketPolicy containerPolicy, ByteBufferPool bufferPool)
{
this(policy, bufferPool, new DecoratedObjectFactory());
this(containerPolicy, bufferPool, new DecoratedObjectFactory());
}
public SimpleContainerScope(WebSocketPolicy policy, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
public SimpleContainerScope(WebSocketPolicy containerPolicy, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
{
this.policy = policy;
this.containerPolicy = containerPolicy;
this.bufferPool = bufferPool;
this.objectFactory = objectFactory;
@ -102,7 +102,7 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
@Override
public WebSocketPolicy getPolicy()
{
return this.policy;
return this.containerPolicy;
}
@Override

View File

@ -36,48 +36,47 @@ public interface WebSocketContainerScope
*
* @return the buffer pool (never null)
*/
public ByteBufferPool getBufferPool();
ByteBufferPool getBufferPool();
/**
* Executor in use by the container.
*
* @return the Executor in use by the container.
*/
public Executor getExecutor();
Executor getExecutor();
/**
* Object Factory used to create objects.
*
* @return Object Factory used to create instances of objects.
*/
public DecoratedObjectFactory getObjectFactory();
DecoratedObjectFactory getObjectFactory();
/**
* The policy the container is running on.
*
* @return the websocket policy
*/
public WebSocketPolicy getPolicy();
WebSocketPolicy getPolicy();
/**
* The SslContextFactory in use by the container.
*
* @return the SslContextFactory in use by the container (can be null if no SSL context is defined)
*/
public SslContextFactory getSslContextFactory();
SslContextFactory getSslContextFactory();
/**
* A Session has been opened
*
* @param session the session that was opened
*/
public void onSessionOpened(WebSocketSession session);
void onSessionOpened(WebSocketSession session);
/**
* A Session has been closed
*
* @param session the session that was closed
*/
public void onSessionClosed(WebSocketSession session);
void onSessionClosed(WebSocketSession session);
}

View File

@ -18,6 +18,9 @@
package examples;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import java.io.InputStream;
import org.eclipse.jetty.websocket.api.Session;
@ -35,10 +38,7 @@ public class AnnotatedBinaryStreamSocket
@OnWebSocketMessage
public void onBinary(InputStream stream)
{
if (stream == null)
{
new RuntimeException("Stream cannot be null").printStackTrace(System.err);
}
assertThat("InputStream", stream, notNullValue());
capture.add("onBinary(%s)",stream);
}

View File

@ -142,12 +142,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
return 0;
}
@Override
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
public InetSocketAddress getRemoteAddress()
{

View File

@ -34,6 +34,7 @@ public class LocalWebSocketSession extends WebSocketSession
public LocalWebSocketSession(WebSocketContainerScope containerScope, TestName testname, Object websocket)
{
super(containerScope,URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),websocket,
containerScope.getPolicy(),
new LocalWebSocketConnection(testname,containerScope.getBufferPool()));
this.id = testname.getMethodName();
outgoingCapture = new OutgoingFramesCapture();

View File

@ -90,7 +90,6 @@ public class MessageOutputStreamTest
TrackingSocket localSocket = new TrackingSocket("local");
session = new LocalWebSocketSession(containerScope,testname,localSocket);
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// start session

View File

@ -82,7 +82,7 @@ public class MessageWriterTest
remoteSocket = new TrackingSocket("remote");
URI remoteURI = new URI("ws://localhost/remote");
LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(bufferPool);
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,remoteConnection);
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,policy,remoteConnection);
OutgoingFrames socketPipe = FramePipes.to(remoteSession);
remoteSession.start();
remoteSession.open();
@ -91,9 +91,8 @@ public class MessageWriterTest
TrackingSocket localSocket = new TrackingSocket("local");
URI localURI = new URI("ws://localhost/local");
LocalWebSocketConnection localConnection = new LocalWebSocketConnection(bufferPool);
session = new WebSocketSession(containerScope,localURI,localSocket,localConnection);
session = new WebSocketSession(containerScope,localURI,localSocket,policy,localConnection);
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// start session

View File

@ -109,12 +109,6 @@ public class DummyConnection implements LogicalConnection
return 0;
}
@Override
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
public InetSocketAddress getRemoteAddress()
{

View File

@ -90,7 +90,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
private final Scheduler scheduler = new ScheduledExecutorScheduler();
private final List<WebSocketSession.Listener> listeners = new CopyOnWriteArrayList<>();
private final String supportedVersions;
private final WebSocketPolicy defaultPolicy;
private final WebSocketPolicy containerPolicy;
private final ByteBufferPool bufferPool;
private final WebSocketExtensionFactory extensionFactory;
private Executor executor;
@ -125,7 +125,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.registeredSocketClasses = new ArrayList<>();
this.defaultPolicy = policy;
this.containerPolicy = policy;
this.bufferPool = bufferPool;
this.extensionFactory = new WebSocketExtensionFactory(this);
@ -255,7 +255,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{
try
{
return impl.createSession(requestURI, websocket, connection);
return impl.createSession(requestURI, websocket, containerPolicy, connection);
}
catch (Throwable e)
{
@ -342,7 +342,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
@Override
public WebSocketPolicy getPolicy()
{
return defaultPolicy;
return containerPolicy;
}
@Override
@ -583,7 +583,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Setup websocket connection
AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, getPolicy().clonePolicy(), bufferPool);
extensionStack.setPolicy(wsConnection.getPolicy());
extensionStack.setPolicy(containerPolicy);
extensionStack.configure(wsConnection.getParser());
extensionStack.configure(wsConnection.getGenerator());