Issue #207 - Careful distinction between container policy, vs session policy

# Conflicts:
#	jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
This commit is contained in:
Joakim Erdfelt 2017-02-28 13:05:29 -07:00
parent 70d4850ed1
commit 1c4d658be2
32 changed files with 166 additions and 131 deletions

View File

@ -103,13 +103,14 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
} }
public EndpointFunctions newJsrEndpointFunction(Object endpoint, public EndpointFunctions newJsrEndpointFunction(Object endpoint,
WebSocketPolicy sessionPolicy,
AvailableEncoders availableEncoders, AvailableEncoders availableEncoders,
AvailableDecoders availableDecoders, AvailableDecoders availableDecoders,
Map<String, String> pathParameters, Map<String, String> pathParameters,
EndpointConfig config) EndpointConfig config)
{ {
return new JsrEndpointFunctions(endpoint, return new JsrEndpointFunctions(endpoint,
getPolicy(), sessionPolicy,
getExecutor(), getExecutor(),
availableEncoders, availableEncoders,
availableDecoders, availableDecoders,

View File

@ -42,7 +42,6 @@ import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.LogicalConnection; import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
@ -71,9 +70,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
private JsrAsyncRemote asyncRemote; private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote; private JsrBasicRemote basicRemote;
public JsrSession(ClientContainer container, String id, URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection) public JsrSession(ClientContainer container, String id, URI requestURI, Object websocket, LogicalConnection connection)
{ {
super(container, requestURI, websocket, policy, connection); super(container, requestURI, websocket, connection);
this.container = container; this.container = container;
@ -100,6 +99,7 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
// Delegate to container to obtain correct version of JsrEndpointFunctions // Delegate to container to obtain correct version of JsrEndpointFunctions
// Could be a Client version, or a Server version // Could be a Client version, or a Server version
return container.newJsrEndpointFunction(endpoint, return container.newJsrEndpointFunction(endpoint,
getPolicy(),
availableEncoders, availableEncoders,
availableDecoders, availableDecoders,
pathParameters, pathParameters,

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.jsr356;
import java.net.URI; import java.net.URI;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.LogicalConnection; import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionFactory; import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
@ -35,9 +34,9 @@ public class JsrSessionFactory implements SessionFactory
} }
@Override @Override
public WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection) public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
{ {
return new JsrSession(container,connection.getId(),requestURI,websocket,policy,connection); return new JsrSession(container,connection.getId(),requestURI,websocket,connection);
} }
@Override @Override

View File

@ -63,7 +63,7 @@ public class JsrSessionTest
ConfiguredEndpoint ei = new ConfiguredEndpoint(new DummyEndpoint(), config); ConfiguredEndpoint ei = new ConfiguredEndpoint(new DummyEndpoint(), config);
// Session // Session
session = new JsrSession(container, id, requestURI, ei, policy, connection); session = new JsrSession(container, id, requestURI, ei, connection);
session.start(); session.start();
} }

View File

@ -75,7 +75,7 @@ public class JsrEndpointFunctions_OnCloseTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
private void assertOnCloseInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception private void assertOnCloseInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception

View File

@ -72,7 +72,7 @@ public class JsrEndpointFunctions_OnErrorTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
private void assertOnErrorInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception private void assertOnErrorInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception

View File

@ -82,7 +82,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryStreamTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")

View File

@ -84,7 +84,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception

View File

@ -82,7 +82,7 @@ public class JsrEndpointFunctions_OnMessage_TextStreamTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")

View File

@ -82,7 +82,7 @@ public class JsrEndpointFunctions_OnMessage_TextTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
private void onText(TrackingSocket socket, String msg) throws Exception private void onText(TrackingSocket socket, String msg) throws Exception

View File

@ -71,7 +71,7 @@ public class JsrEndpointFunctions_OnOpenTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
private void assertOnOpenInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception private void assertOnOpenInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions; import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.jsr356.ClientContainer; import org.eclipse.jetty.websocket.jsr356.ClientContainer;
@ -187,7 +188,7 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
pathParameters.put(variable, "0"); pathParameters.put(variable, "0");
} }
endpointFunctions = newJsrEndpointFunction(endpoint, availableEncoders, availableDecoders, pathParameters, config); endpointFunctions = newJsrEndpointFunction(endpoint, getPolicy(), availableEncoders, availableDecoders, pathParameters, config);
endpointFunctions.start(); // this should trigger an exception if endpoint is invalid. endpointFunctions.start(); // this should trigger an exception if endpoint is invalid.
} }
catch (InstantiationException e) catch (InstantiationException e)
@ -221,13 +222,14 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
@Override @Override
public EndpointFunctions newJsrEndpointFunction(Object endpoint, public EndpointFunctions newJsrEndpointFunction(Object endpoint,
WebSocketPolicy sessionPolicy,
AvailableEncoders availableEncoders, AvailableEncoders availableEncoders,
AvailableDecoders availableDecoders, AvailableDecoders availableDecoders,
Map<String, String> pathParameters, Map<String, String> pathParameters,
EndpointConfig config) EndpointConfig config)
{ {
return new JsrServerEndpointFunctions(endpoint, return new JsrServerEndpointFunctions(endpoint,
getPolicy(), sessionPolicy,
getExecutor(), getExecutor(),
availableEncoders, availableEncoders,
availableDecoders, availableDecoders,

View File

@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.containsString;
import java.net.URI; import java.net.URI;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Queue; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -82,6 +82,7 @@ public class AnnotatedServerEndpointTest
{ {
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
URI uri = server.getServerBaseURI().resolve("echo"); URI uri = server.getServerBaseURI().resolve("echo");
ClientUpgradeRequest req = new ClientUpgradeRequest(); ClientUpgradeRequest req = new ClientUpgradeRequest();
req.setSubProtocols("echo"); req.setSubProtocols("echo");
@ -90,9 +91,9 @@ public class AnnotatedServerEndpointTest
foo.get(1,TimeUnit.SECONDS); foo.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage(message); clientEcho.sendMessage(message);
Queue<String> msgs = clientEcho.awaitMessages(1); List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
String response = msgs.poll(); String response = msgs.get(0);
for (String expected : expectedTexts) for (String expected : expectedTexts)
{ {
Assert.assertThat("Expected message",response,containsString(expected)); Assert.assertThat("Expected message",response,containsString(expected));

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.websocket.jsr356.server; package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI; import java.net.URI;
import java.util.Queue; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -69,12 +69,13 @@ public class BasicEndpointTest
{ {
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
Future<Session> future = client.connect(clientEcho,uri.resolve("echo")); Future<Session> future = client.connect(clientEcho,uri.resolve("echo"));
// wait for connect // wait for connect
future.get(1,TimeUnit.SECONDS); future.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage("Hello World"); clientEcho.sendMessage("Hello World");
Queue<String> msgs = clientEcho.awaitMessages(1); List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
Assert.assertEquals("Expected message","Hello World",msgs.poll()); Assert.assertEquals("Expected message","Hello World",msgs.get(0));
} }
finally finally
{ {

View File

@ -18,12 +18,14 @@
package org.eclipse.jetty.websocket.jsr356.server; package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import java.net.URI; import java.net.URI;
import java.util.Queue; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -32,7 +34,9 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.webapp.WebAppContext; import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule; import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule;
import org.eclipse.jetty.websocket.jsr356.server.samples.idletimeout.IdleTimeoutContextListener; import org.eclipse.jetty.websocket.jsr356.server.samples.idletimeout.IdleTimeoutContextListener;
@ -80,6 +84,8 @@ public class IdleTimeoutTest
{ {
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Client Attempting to connect"); LOG.debug("Client Attempting to connect");
Future<Session> future = client.connect(clientEcho,uri); Future<Session> future = client.connect(clientEcho,uri);
@ -93,18 +99,26 @@ public class IdleTimeoutTest
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Waited 1 second"); LOG.debug("Waited 1 second");
if (clientEcho.getClosed() == false)
// Try to write
clientEcho.sendMessage("You shouldn't be there");
try
{ {
// Try to write List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
clientEcho.sendMessage("You shouldn't be there"); assertThat("Should not have received messages echoed back",msgs,is(empty()));
try }
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if(cause instanceof CloseException)
{ {
Queue<String> msgs = clientEcho.awaitMessages(1); CloseException ce = (CloseException) cause;
assertThat("Should not have received messages echoed back",msgs,is(empty())); assertThat("CloseException.statusCode", ce.getStatusCode(), is(StatusCode.SHUTDOWN));
assertThat("CloseException.reason", ce.getMessage(), containsString("Idle Timeout"));
} }
catch (TimeoutException | InterruptedException e) else
{ {
// valid success path throw e;
} }
} }
} }

View File

@ -19,18 +19,19 @@
package org.eclipse.jetty.websocket.jsr356.server; package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.ArrayList;
import java.util.concurrent.TimeUnit; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock; import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
@ -46,43 +47,29 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
public class JettyEchoSocket public class JettyEchoSocket
{ {
private static final Logger LOG = Log.getLogger(JettyEchoSocket.class); private static final Logger LOG = Log.getLogger(JettyEchoSocket.class);
@SuppressWarnings("unused")
private Session session;
private Lock remoteLock = new ReentrantLock();
private RemoteEndpoint remote; private RemoteEndpoint remote;
private EventQueue<String> incomingMessages = new EventQueue<>(); private CompletableFuture<List<String>> expectedMessagesFuture;
private AtomicInteger expectedMessageCount;
private List<String> messages = new ArrayList<>();
public Queue<String> awaitMessages(int expected) throws TimeoutException, InterruptedException public Future<List<String>> expectedMessages(int expected)
{ {
incomingMessages.awaitEventCount(expected,2,TimeUnit.SECONDS); expectedMessagesFuture = new CompletableFuture<>();
return incomingMessages; expectedMessageCount = new AtomicInteger(expected);
} return expectedMessagesFuture;
public boolean getClosed()
{
remoteLock.lock();
try
{
return (remote == null);
}
finally
{
remoteLock.unlock();
}
} }
@OnWebSocketClose @OnWebSocketClose
public void onClose(int code, String reason) public void onClose(int code, String reason)
{ {
session = null; remote = null;
remoteLock.lock(); synchronized (expectedMessagesFuture)
try
{ {
remote = null; if ((code != StatusCode.NORMAL) ||
} (code != StatusCode.NO_CODE))
finally {
{ expectedMessagesFuture.completeExceptionally(new CloseException(code, reason));
remoteLock.unlock(); }
} }
} }
@ -90,48 +77,43 @@ public class JettyEchoSocket
public void onError(Throwable t) public void onError(Throwable t)
{ {
LOG.warn(t); LOG.warn(t);
synchronized (expectedMessagesFuture)
{
expectedMessagesFuture.completeExceptionally(t);
}
} }
@OnWebSocketMessage @OnWebSocketMessage
public void onMessage(String msg) throws IOException public void onMessage(String msg) throws IOException
{ {
incomingMessages.add(msg); messages.add(msg);
synchronized (expectedMessagesFuture)
{
int countLeft = expectedMessageCount.decrementAndGet();
if (countLeft <= 0)
{
expectedMessagesFuture.complete(messages);
}
}
sendMessage(msg); sendMessage(msg);
} }
@OnWebSocketConnect @OnWebSocketConnect
public void onOpen(Session session) public void onOpen(Session session)
{ {
this.session = session; this.remote = session.getRemote();
remoteLock.lock();
try
{
this.remote = session.getRemote();
}
finally
{
remoteLock.unlock();
}
} }
public void sendMessage(String msg) throws IOException public void sendMessage(String msg) throws IOException
{ {
remoteLock.lock(); RemoteEndpoint r = remote;
try if (r == null)
{ {
RemoteEndpoint r = remote; return;
if (r == null) }
{
return;
}
r.sendStringByFuture(msg); r.sendStringByFuture(msg);
if (r.getBatchMode() == BatchMode.ON) if (r.getBatchMode() == BatchMode.ON)
r.flush(); r.flush();
}
finally
{
remoteLock.unlock();
}
} }
} }

View File

@ -83,7 +83,7 @@ public class JsrServerEndpointFunctions_OnMessage_TextStreamTest
DummyConnection connection = new DummyConnection(policy); DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig(); ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection); return new JsrSession(container, id, requestURI, ei, connection);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Queue; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -48,6 +48,7 @@ public class LargeAnnotatedTest
@Rule @Rule
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test"); public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
@SuppressWarnings("Duplicates")
@Test @Test
public void testEcho() throws Exception public void testEcho() throws Exception
{ {
@ -70,6 +71,7 @@ public class LargeAnnotatedTest
client.getPolicy().setMaxTextMessageSize(128*1024); client.getPolicy().setMaxTextMessageSize(128*1024);
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
Future<Session> foo = client.connect(clientEcho,uri.resolve("echo/large")); Future<Session> foo = client.connect(clientEcho,uri.resolve("echo/large"));
// wait for connect // wait for connect
foo.get(1,TimeUnit.SECONDS); foo.get(1,TimeUnit.SECONDS);
@ -78,8 +80,8 @@ public class LargeAnnotatedTest
Arrays.fill(txt,(byte)'o'); Arrays.fill(txt,(byte)'o');
String msg = new String(txt,StandardCharsets.UTF_8); String msg = new String(txt,StandardCharsets.UTF_8);
clientEcho.sendMessage(msg); clientEcho.sendMessage(msg);
Queue<String> msgs = clientEcho.awaitMessages(1); List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
Assert.assertEquals("Expected message",msg,msgs.poll()); Assert.assertEquals("Expected message",msg,msgs.get(0));
} }
finally finally
{ {

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Queue; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -48,6 +48,7 @@ public class LargeContainerTest
@Rule @Rule
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test"); public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
@SuppressWarnings("Duplicates")
@Test @Test
public void testEcho() throws Exception public void testEcho() throws Exception
{ {
@ -70,6 +71,7 @@ public class LargeContainerTest
client.getPolicy().setMaxTextMessageSize(128*1024); client.getPolicy().setMaxTextMessageSize(128*1024);
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
Future<Session> foo = client.connect(clientEcho,uri.resolve("echo/large")); Future<Session> foo = client.connect(clientEcho,uri.resolve("echo/large"));
// wait for connect // wait for connect
foo.get(1,TimeUnit.SECONDS); foo.get(1,TimeUnit.SECONDS);
@ -78,8 +80,8 @@ public class LargeContainerTest
Arrays.fill(txt,(byte)'o'); Arrays.fill(txt,(byte)'o');
String msg = new String(txt,StandardCharsets.UTF_8); String msg = new String(txt,StandardCharsets.UTF_8);
clientEcho.sendMessage(msg); clientEcho.sendMessage(msg);
Queue<String> msgs = clientEcho.awaitMessages(1); List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
Assert.assertEquals("Expected message",msg,msgs.poll()); Assert.assertEquals("Expected message",msg,msgs.get(0));
} }
finally finally
{ {

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.websocket.jsr356.server; package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI; import java.net.URI;
import java.util.Queue; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -61,12 +61,13 @@ public class OnMessageReturnTest
{ {
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
Future<Session> future = client.connect(clientEcho,uri.resolve("echoreturn")); Future<Session> future = client.connect(clientEcho,uri.resolve("echoreturn"));
// wait for connect // wait for connect
future.get(1,TimeUnit.SECONDS); future.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage("Hello World"); clientEcho.sendMessage("Hello World");
Queue<String> msgs = clientEcho.awaitMessages(1); List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
Assert.assertEquals("Expected message","Hello World",msgs.poll()); Assert.assertEquals("Expected message","Hello World",msgs.get(0));
} }
finally finally
{ {

View File

@ -24,7 +24,6 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -139,12 +138,13 @@ public class SessionTest
{ {
client.start(); client.start();
JettyEchoSocket clientEcho = new JettyEchoSocket(); JettyEchoSocket clientEcho = new JettyEchoSocket();
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
Future<Session> future = client.connect(clientEcho,serverUri.resolve(requestPath)); Future<Session> future = client.connect(clientEcho,serverUri.resolve(requestPath));
// wait for connect // wait for connect
future.get(1,TimeUnit.SECONDS); future.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage(requestMessage); clientEcho.sendMessage(requestMessage);
Queue<String> msgs = clientEcho.awaitMessages(1); List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
Assert.assertThat("Expected message",msgs.poll(),is(expectedResponse)); Assert.assertThat("Expected message",msgs.get(0),is(expectedResponse));
} }
finally finally
{ {

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.io.IOState; import org.eclipse.jetty.websocket.common.io.IOState;
@ -98,6 +99,13 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/ */
long getMaxIdleTimeout(); long getMaxIdleTimeout();
/**
* Get the Connection based WebSocket Policy.
*
* @return the WebSocket policy for the connection
*/
WebSocketPolicy getPolicy();
/** /**
* Get the remote Address in use for this connection. * Get the remote Address in use for this connection.
* <p> * <p>

View File

@ -20,8 +20,6 @@ package org.eclipse.jetty.websocket.common;
import java.net.URI; import java.net.URI;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
/** /**
* Interface for creating jetty {@link WebSocketSession} objects. * Interface for creating jetty {@link WebSocketSession} objects.
*/ */
@ -39,9 +37,8 @@ public interface SessionFactory
* *
* @param requestURI * @param requestURI
* @param websocket * @param websocket
* @param policy
* @param connection * @param connection
* @return * @return
*/ */
WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection); WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection);
} }

View File

@ -100,7 +100,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private UpgradeResponse upgradeResponse; private UpgradeResponse upgradeResponse;
private CompletableFuture<Session> openFuture; private CompletableFuture<Session> openFuture;
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, WebSocketPolicy policy, LogicalConnection connection) public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, LogicalConnection connection)
{ {
Objects.requireNonNull(containerScope, "Container Scope cannot be null"); Objects.requireNonNull(containerScope, "Container Scope cannot be null");
Objects.requireNonNull(requestURI, "Request URI cannot be null"); Objects.requireNonNull(requestURI, "Request URI cannot be null");
@ -113,7 +113,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
this.executor = connection.getExecutor(); this.executor = connection.getExecutor();
this.outgoingHandler = connection; this.outgoingHandler = connection;
this.connection.getIOState().addListener(this); this.connection.getIOState().addListener(this);
this.policy = policy; this.policy = connection.getPolicy();
addBean(this.connection); addBean(this.connection);
} }

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.common;
import java.net.URI; import java.net.URI;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope; import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -44,8 +43,8 @@ public class WebSocketSessionFactory implements SessionFactory
} }
@Override @Override
public WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection) public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
{ {
return new WebSocketSession(containerScope, requestURI, websocket, policy, connection); return new WebSocketSession(containerScope, requestURI, websocket, connection);
} }
} }

View File

@ -214,6 +214,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final Scheduler scheduler; private final Scheduler scheduler;
private final Generator generator; private final Generator generator;
private final Parser parser; private final Parser parser;
private final WebSocketPolicy policy;
private final WebSocketBehavior behavior; private final WebSocketBehavior behavior;
private final AtomicBoolean suspendToken; private final AtomicBoolean suspendToken;
private final FrameFlusher flusher; private final FrameFlusher flusher;
@ -233,6 +234,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
endp.getLocalAddress().getPort(), endp.getLocalAddress().getPort(),
endp.getRemoteAddress().getAddress().getHostAddress(), endp.getRemoteAddress().getAddress().getHostAddress(),
endp.getRemoteAddress().getPort()); endp.getRemoteAddress().getPort());
this.policy = policy;
this.behavior = policy.getBehavior(); this.behavior = policy.getBehavior();
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool); this.generator = new Generator(policy,bufferPool);
@ -390,6 +392,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return parser; return parser;
} }
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {

View File

@ -30,7 +30,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
public class StringMessageSink implements MessageSink public class StringMessageSink implements MessageSink
{ {
private static final Logger LOG = Log.getLogger(StringMessageSink.class); private static final Logger LOG = Log.getLogger(StringMessageSink.class);
private final WebSocketPolicy policy; private WebSocketPolicy policy;
private final Function<String, Void> onMessageFunction; private final Function<String, Void> onMessageFunction;
private Utf8StringBuilder utf; private Utf8StringBuilder utf;
private int size = 0; private int size = 0;

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState; import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection; import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.junit.rules.TestName; import org.junit.rules.TestName;
public class LocalWebSocketConnection implements LogicalConnection, IncomingFrames, ConnectionStateListener public class LocalWebSocketConnection implements LogicalConnection, IncomingFrames, ConnectionStateListener
@ -44,7 +45,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
private final String id; private final String id;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final Executor executor; private final Executor executor;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy(); private WebSocketPolicy policy;
private IncomingFrames incoming; private IncomingFrames incoming;
private IOState ioState = new IOState(); private IOState ioState = new IOState();
@ -59,6 +60,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.executor = new ExecutorThreadPool(); this.executor = new ExecutorThreadPool();
this.ioState.addListener(this); this.ioState.addListener(this);
this.policy = WebSocketPolicy.newServerPolicy();
} }
public LocalWebSocketConnection(TestName testname, ByteBufferPool bufferPool) public LocalWebSocketConnection(TestName testname, ByteBufferPool bufferPool)
@ -66,6 +68,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
this(testname.getMethodName(),bufferPool); this(testname.getMethodName(),bufferPool);
} }
public LocalWebSocketConnection(TestName testname, WebSocketContainerScope containerScope)
{
this(testname.getMethodName(), containerScope.getBufferPool());
this.policy = containerScope.getPolicy();
}
@Override @Override
public Executor getExecutor() public Executor getExecutor()
{ {
@ -142,6 +150,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
return 0; return 0;
} }
@Override
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {

View File

@ -34,8 +34,7 @@ public class LocalWebSocketSession extends WebSocketSession
public LocalWebSocketSession(WebSocketContainerScope containerScope, TestName testname, Object websocket) public LocalWebSocketSession(WebSocketContainerScope containerScope, TestName testname, Object websocket)
{ {
super(containerScope,URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),websocket, super(containerScope,URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),websocket,
containerScope.getPolicy(), new LocalWebSocketConnection(testname,containerScope));
new LocalWebSocketConnection(testname,containerScope.getBufferPool()));
this.id = testname.getMethodName(); this.id = testname.getMethodName();
outgoingCapture = new OutgoingFramesCapture(); outgoingCapture = new OutgoingFramesCapture();
setOutgoingHandler(outgoingCapture); setOutgoingHandler(outgoingCapture);

View File

@ -82,7 +82,7 @@ public class MessageWriterTest
remoteSocket = new TrackingSocket("remote"); remoteSocket = new TrackingSocket("remote");
URI remoteURI = new URI("ws://localhost/remote"); URI remoteURI = new URI("ws://localhost/remote");
LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(bufferPool); LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(bufferPool);
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,policy,remoteConnection); remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,remoteConnection);
OutgoingFrames socketPipe = FramePipes.to(remoteSession); OutgoingFrames socketPipe = FramePipes.to(remoteSession);
remoteSession.start(); remoteSession.start();
remoteSession.open(); remoteSession.open();
@ -91,7 +91,7 @@ public class MessageWriterTest
TrackingSocket localSocket = new TrackingSocket("local"); TrackingSocket localSocket = new TrackingSocket("local");
URI localURI = new URI("ws://localhost/local"); URI localURI = new URI("ws://localhost/local");
LocalWebSocketConnection localConnection = new LocalWebSocketConnection(bufferPool); LocalWebSocketConnection localConnection = new LocalWebSocketConnection(bufferPool);
session = new WebSocketSession(containerScope,localURI,localSocket,policy,localConnection); session = new WebSocketSession(containerScope,localURI,localSocket,localConnection);
// talk to our remote socket // talk to our remote socket
session.setOutgoingHandler(socketPipe); session.setOutgoingHandler(socketPipe);

View File

@ -109,6 +109,12 @@ public class DummyConnection implements LogicalConnection
return 0; return 0;
} }
@Override
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {

View File

@ -265,7 +265,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{ {
try try
{ {
return impl.createSession(requestURI, websocket, containerPolicy, connection); return impl.createSession(requestURI, websocket, connection);
} }
catch (Throwable e) catch (Throwable e)
{ {
@ -562,7 +562,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Setup websocket connection // Setup websocket connection
AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, getPolicy().clonePolicy(), bufferPool); AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, getPolicy().clonePolicy(), bufferPool);
extensionStack.setPolicy(containerPolicy); extensionStack.setPolicy(wsConnection.getPolicy());
extensionStack.configure(wsConnection.getParser()); extensionStack.configure(wsConnection.getParser());
extensionStack.configure(wsConnection.getGenerator()); extensionStack.configure(wsConnection.getGenerator());