Issue #207 - Careful distinction between container policy, vs session policy
This commit is contained in:
parent
0316e18f91
commit
acf743fa6b
|
@ -99,13 +99,14 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
|
|||
}
|
||||
|
||||
public EndpointFunctions newJsrEndpointFunction(Object endpoint,
|
||||
WebSocketPolicy sessionPolicy,
|
||||
AvailableEncoders availableEncoders,
|
||||
AvailableDecoders availableDecoders,
|
||||
Map<String, String> pathParameters,
|
||||
EndpointConfig config)
|
||||
{
|
||||
return new JsrEndpointFunctions(endpoint,
|
||||
getPolicy(),
|
||||
sessionPolicy,
|
||||
getExecutor(),
|
||||
availableEncoders,
|
||||
availableDecoders,
|
||||
|
|
|
@ -42,7 +42,6 @@ import javax.websocket.WebSocketContainer;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
|
@ -71,9 +70,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
|
|||
private JsrAsyncRemote asyncRemote;
|
||||
private JsrBasicRemote basicRemote;
|
||||
|
||||
public JsrSession(ClientContainer container, String id, URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection)
|
||||
public JsrSession(ClientContainer container, String id, URI requestURI, Object websocket, LogicalConnection connection)
|
||||
{
|
||||
super(container, requestURI, websocket, policy, connection);
|
||||
super(container, requestURI, websocket, connection);
|
||||
|
||||
this.container = container;
|
||||
|
||||
|
@ -100,6 +99,7 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
|
|||
// Delegate to container to obtain correct version of JsrEndpointFunctions
|
||||
// Could be a Client version, or a Server version
|
||||
return container.newJsrEndpointFunction(endpoint,
|
||||
getPolicy(),
|
||||
availableEncoders,
|
||||
availableDecoders,
|
||||
pathParameters,
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.jsr356;
|
|||
|
||||
import java.net.URI;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.SessionFactory;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
|
@ -35,9 +34,9 @@ public class JsrSessionFactory implements SessionFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection)
|
||||
public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
|
||||
{
|
||||
return new JsrSession(container,connection.getId(),requestURI,websocket,policy,connection);
|
||||
return new JsrSession(container,connection.getId(),requestURI,websocket,connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -63,7 +63,7 @@ public class JsrSessionTest
|
|||
ConfiguredEndpoint ei = new ConfiguredEndpoint(new DummyEndpoint(), config);
|
||||
|
||||
// Session
|
||||
session = new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
session = new JsrSession(container, id, requestURI, ei, connection);
|
||||
session.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ public class JsrEndpointFunctions_OnCloseTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
private void assertOnCloseInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception
|
||||
|
|
|
@ -72,7 +72,7 @@ public class JsrEndpointFunctions_OnErrorTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
private void assertOnErrorInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception
|
||||
|
|
|
@ -82,7 +82,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryStreamTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
|
|
|
@ -84,7 +84,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception
|
||||
|
|
|
@ -82,7 +82,7 @@ public class JsrEndpointFunctions_OnMessage_TextStreamTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
|
|
|
@ -82,7 +82,7 @@ public class JsrEndpointFunctions_OnMessage_TextTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
private void onText(TrackingSocket socket, String msg) throws Exception
|
||||
|
|
|
@ -71,7 +71,7 @@ public class JsrEndpointFunctions_OnOpenTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
private void assertOnOpenInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception
|
||||
|
|
|
@ -35,6 +35,7 @@ import javax.websocket.server.ServerEndpointConfig;
|
|||
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
|
||||
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
|
||||
|
@ -182,7 +183,7 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
|
|||
pathParameters.put(variable, "0");
|
||||
}
|
||||
|
||||
endpointFunctions = newJsrEndpointFunction(endpoint, availableEncoders, availableDecoders, pathParameters, config);
|
||||
endpointFunctions = newJsrEndpointFunction(endpoint, getPolicy(), availableEncoders, availableDecoders, pathParameters, config);
|
||||
endpointFunctions.start(); // this should trigger an exception if endpoint is invalid.
|
||||
}
|
||||
catch (InstantiationException e)
|
||||
|
@ -216,13 +217,14 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
|
|||
|
||||
@Override
|
||||
public EndpointFunctions newJsrEndpointFunction(Object endpoint,
|
||||
WebSocketPolicy sessionPolicy,
|
||||
AvailableEncoders availableEncoders,
|
||||
AvailableDecoders availableDecoders,
|
||||
Map<String, String> pathParameters,
|
||||
EndpointConfig config)
|
||||
{
|
||||
return new JsrServerEndpointFunctions(endpoint,
|
||||
getPolicy(),
|
||||
sessionPolicy,
|
||||
getExecutor(),
|
||||
availableEncoders,
|
||||
availableDecoders,
|
||||
|
|
|
@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.containsString;
|
|||
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Queue;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -82,6 +82,7 @@ public class AnnotatedServerEndpointTest
|
|||
{
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
URI uri = server.getServerBaseURI().resolve("echo");
|
||||
ClientUpgradeRequest req = new ClientUpgradeRequest();
|
||||
req.setSubProtocols("echo");
|
||||
|
@ -90,9 +91,9 @@ public class AnnotatedServerEndpointTest
|
|||
foo.get(1,TimeUnit.SECONDS);
|
||||
|
||||
clientEcho.sendMessage(message);
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
|
||||
String response = msgs.poll();
|
||||
String response = msgs.get(0);
|
||||
for (String expected : expectedTexts)
|
||||
{
|
||||
Assert.assertThat("Expected message",response,containsString(expected));
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.eclipse.jetty.websocket.jsr356.server;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Queue;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -69,12 +69,13 @@ public class BasicEndpointTest
|
|||
{
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
Future<Session> future = client.connect(clientEcho,uri.resolve("echo"));
|
||||
// wait for connect
|
||||
future.get(1,TimeUnit.SECONDS);
|
||||
clientEcho.sendMessage("Hello World");
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
Assert.assertEquals("Expected message","Hello World",msgs.poll());
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("Expected message","Hello World",msgs.get(0));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -18,13 +18,14 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.jsr356.server;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Queue;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -35,7 +36,9 @@ import org.eclipse.jetty.toolchain.test.TestingDir;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
import org.eclipse.jetty.websocket.api.CloseException;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule;
|
||||
import org.eclipse.jetty.websocket.jsr356.server.samples.idletimeout.IdleTimeoutContextListener;
|
||||
|
@ -90,6 +93,8 @@ public class IdleTimeoutTest
|
|||
{
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Client Attempting to connnect");
|
||||
Future<Session> future = client.connect(clientEcho,uri);
|
||||
|
@ -103,18 +108,26 @@ public class IdleTimeoutTest
|
|||
TimeUnit.SECONDS.sleep(1);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Waited 1 second");
|
||||
if (clientEcho.getClosed() == false)
|
||||
|
||||
// Try to write
|
||||
clientEcho.sendMessage("You shouldn't be there");
|
||||
try
|
||||
{
|
||||
// Try to write
|
||||
clientEcho.sendMessage("You shouldn't be there");
|
||||
try
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
assertThat("Should not have received messages echoed back",msgs,is(empty()));
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
if(cause instanceof CloseException)
|
||||
{
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
assertThat("Should not have received messages echoed back",msgs,is(empty()));
|
||||
CloseException ce = (CloseException) cause;
|
||||
assertThat("CloseException.statusCode", ce.getStatusCode(), is(StatusCode.SHUTDOWN));
|
||||
assertThat("CloseException.reason", ce.getMessage(), containsString("Idle Timeout"));
|
||||
}
|
||||
catch (TimeoutException | InterruptedException e)
|
||||
else
|
||||
{
|
||||
// valid success path
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,18 +19,19 @@
|
|||
package org.eclipse.jetty.websocket.jsr356.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.CloseException;
|
||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
|
||||
|
@ -46,92 +47,73 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
|||
public class JettyEchoSocket
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(JettyEchoSocket.class);
|
||||
@SuppressWarnings("unused")
|
||||
private Session session;
|
||||
private Lock remoteLock = new ReentrantLock();
|
||||
private RemoteEndpoint remote;
|
||||
private EventQueue<String> incomingMessages = new EventQueue<>();
|
||||
|
||||
public Queue<String> awaitMessages(int expected) throws TimeoutException, InterruptedException
|
||||
private CompletableFuture<List<String>> expectedMessagesFuture;
|
||||
private AtomicInteger expectedMessageCount;
|
||||
private List<String> messages = new ArrayList<>();
|
||||
|
||||
public Future<List<String>> expectedMessages(int expected)
|
||||
{
|
||||
incomingMessages.awaitEventCount(expected,2,TimeUnit.SECONDS);
|
||||
return incomingMessages;
|
||||
expectedMessagesFuture = new CompletableFuture<>();
|
||||
expectedMessageCount = new AtomicInteger(expected);
|
||||
return expectedMessagesFuture;
|
||||
}
|
||||
|
||||
public boolean getClosed()
|
||||
{
|
||||
remoteLock.lock();
|
||||
try
|
||||
{
|
||||
return (remote == null);
|
||||
}
|
||||
finally
|
||||
{
|
||||
remoteLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@OnWebSocketClose
|
||||
public void onClose(int code, String reason)
|
||||
{
|
||||
session = null;
|
||||
remoteLock.lock();
|
||||
try
|
||||
remote = null;
|
||||
synchronized (expectedMessagesFuture)
|
||||
{
|
||||
remote = null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
remoteLock.unlock();
|
||||
if ((code != StatusCode.NORMAL) ||
|
||||
(code != StatusCode.NO_CODE))
|
||||
{
|
||||
expectedMessagesFuture.completeExceptionally(new CloseException(code, reason));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@OnWebSocketError
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
LOG.warn(t);
|
||||
synchronized (expectedMessagesFuture)
|
||||
{
|
||||
expectedMessagesFuture.completeExceptionally(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@OnWebSocketMessage
|
||||
public void onMessage(String msg) throws IOException
|
||||
{
|
||||
incomingMessages.add(msg);
|
||||
messages.add(msg);
|
||||
synchronized (expectedMessagesFuture)
|
||||
{
|
||||
int countLeft = expectedMessageCount.decrementAndGet();
|
||||
if (countLeft <= 0)
|
||||
{
|
||||
expectedMessagesFuture.complete(messages);
|
||||
}
|
||||
}
|
||||
sendMessage(msg);
|
||||
}
|
||||
|
||||
|
||||
@OnWebSocketConnect
|
||||
public void onOpen(Session session)
|
||||
{
|
||||
this.session = session;
|
||||
remoteLock.lock();
|
||||
try
|
||||
{
|
||||
this.remote = session.getRemote();
|
||||
}
|
||||
finally
|
||||
{
|
||||
remoteLock.unlock();
|
||||
}
|
||||
this.remote = session.getRemote();
|
||||
}
|
||||
|
||||
|
||||
public void sendMessage(String msg) throws IOException
|
||||
{
|
||||
remoteLock.lock();
|
||||
try
|
||||
RemoteEndpoint r = remote;
|
||||
if (r == null)
|
||||
{
|
||||
RemoteEndpoint r = remote;
|
||||
if (r == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
r.sendStringByFuture(msg);
|
||||
if (r.getBatchMode() == BatchMode.ON)
|
||||
r.flush();
|
||||
}
|
||||
finally
|
||||
{
|
||||
remoteLock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
r.sendStringByFuture(msg);
|
||||
if (r.getBatchMode() == BatchMode.ON)
|
||||
r.flush();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class JsrServerEndpointFunctions_OnMessage_TextStreamTest
|
|||
DummyConnection connection = new DummyConnection(policy);
|
||||
ClientEndpointConfig config = new EmptyClientEndpointConfig();
|
||||
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
|
||||
return new JsrSession(container, id, requestURI, ei, policy, connection);
|
||||
return new JsrSession(container, id, requestURI, ei, connection);
|
||||
}
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.jsr356.server;
|
|||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Queue;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -46,6 +46,7 @@ public class LargeAnnotatedTest
|
|||
@Rule
|
||||
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
@Test
|
||||
public void testEcho() throws Exception
|
||||
{
|
||||
|
@ -68,6 +69,7 @@ public class LargeAnnotatedTest
|
|||
client.getPolicy().setMaxTextMessageSize(128*1024);
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
Future<Session> foo = client.connect(clientEcho,uri.resolve("echo/large"));
|
||||
// wait for connect
|
||||
foo.get(1,TimeUnit.SECONDS);
|
||||
|
@ -76,8 +78,8 @@ public class LargeAnnotatedTest
|
|||
Arrays.fill(txt,(byte)'o');
|
||||
String msg = new String(txt,StandardCharsets.UTF_8);
|
||||
clientEcho.sendMessage(msg);
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
Assert.assertEquals("Expected message",msg,msgs.poll());
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("Expected message",msg,msgs.get(0));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.jsr356.server;
|
|||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Queue;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -46,6 +46,7 @@ public class LargeContainerTest
|
|||
@Rule
|
||||
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
@Test
|
||||
public void testEcho() throws Exception
|
||||
{
|
||||
|
@ -68,6 +69,7 @@ public class LargeContainerTest
|
|||
client.getPolicy().setMaxTextMessageSize(128*1024);
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
Future<Session> foo = client.connect(clientEcho,uri.resolve("echo/large"));
|
||||
// wait for connect
|
||||
foo.get(1,TimeUnit.SECONDS);
|
||||
|
@ -76,8 +78,8 @@ public class LargeContainerTest
|
|||
Arrays.fill(txt,(byte)'o');
|
||||
String msg = new String(txt,StandardCharsets.UTF_8);
|
||||
clientEcho.sendMessage(msg);
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
Assert.assertEquals("Expected message",msg,msgs.poll());
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("Expected message",msg,msgs.get(0));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.eclipse.jetty.websocket.jsr356.server;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Queue;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -61,12 +61,13 @@ public class OnMessageReturnTest
|
|||
{
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
Future<Session> future = client.connect(clientEcho,uri.resolve("echoreturn"));
|
||||
// wait for connect
|
||||
future.get(1,TimeUnit.SECONDS);
|
||||
clientEcho.sendMessage("Hello World");
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
Assert.assertEquals("Expected message","Hello World",msgs.poll());
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("Expected message","Hello World",msgs.get(0));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.net.URI;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -139,12 +138,13 @@ public class SessionTest
|
|||
{
|
||||
client.start();
|
||||
JettyEchoSocket clientEcho = new JettyEchoSocket();
|
||||
Future<List<String>> clientMessagesFuture = clientEcho.expectedMessages(1);
|
||||
Future<Session> future = client.connect(clientEcho,serverUri.resolve(requestPath));
|
||||
// wait for connect
|
||||
future.get(1,TimeUnit.SECONDS);
|
||||
clientEcho.sendMessage(requestMessage);
|
||||
Queue<String> msgs = clientEcho.awaitMessages(1);
|
||||
Assert.assertThat("Expected message",msgs.poll(),is(expectedResponse));
|
||||
List<String> msgs = clientMessagesFuture.get(1, TimeUnit.SECONDS);
|
||||
Assert.assertThat("Expected message",msgs.get(0),is(expectedResponse));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -314,7 +314,7 @@ public class UpgradeConnection extends AbstractConnection implements Connection.
|
|||
|
||||
// Create WebSocket Session
|
||||
SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
|
||||
WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,policy,connection);
|
||||
WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
|
||||
session.setUpgradeRequest(request);
|
||||
session.setUpgradeResponse(response);
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
|
|||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
|
@ -97,6 +98,13 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
* @return the idle timeout in milliseconds
|
||||
*/
|
||||
long getMaxIdleTimeout();
|
||||
|
||||
/**
|
||||
* Get the Connection based WebSocket Policy.
|
||||
*
|
||||
* @return the WebSocket policy for the connection
|
||||
*/
|
||||
WebSocketPolicy getPolicy();
|
||||
|
||||
/**
|
||||
* Get the remote Address in use for this connection.
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.eclipse.jetty.websocket.common;
|
|||
|
||||
import java.net.URI;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
|
||||
/**
|
||||
* Interface for creating jetty {@link WebSocketSession} objects.
|
||||
*/
|
||||
|
@ -39,9 +37,8 @@ public interface SessionFactory
|
|||
*
|
||||
* @param requestURI
|
||||
* @param websocket
|
||||
* @param policy
|
||||
* @param connection
|
||||
* @return
|
||||
*/
|
||||
WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection);
|
||||
WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection);
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
private UpgradeRequest upgradeRequest;
|
||||
private UpgradeResponse upgradeResponse;
|
||||
|
||||
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, WebSocketPolicy policy, LogicalConnection connection)
|
||||
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, LogicalConnection connection)
|
||||
{
|
||||
Objects.requireNonNull(containerScope, "Container Scope cannot be null");
|
||||
Objects.requireNonNull(requestURI, "Request URI cannot be null");
|
||||
|
@ -111,7 +111,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
this.executor = connection.getExecutor();
|
||||
this.outgoingHandler = connection;
|
||||
this.connection.getIOState().addListener(this);
|
||||
this.policy = policy;
|
||||
this.policy = connection.getPolicy();
|
||||
|
||||
addBean(this.connection);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.common;
|
|||
import java.net.URI;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
||||
|
||||
|
@ -44,8 +43,8 @@ public class WebSocketSessionFactory implements SessionFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public WebSocketSession createSession(URI requestURI, Object websocket, WebSocketPolicy policy, LogicalConnection connection)
|
||||
public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
|
||||
{
|
||||
return new WebSocketSession(containerScope, requestURI, websocket, policy, connection);
|
||||
return new WebSocketSession(containerScope, requestURI, websocket, connection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
private final Scheduler scheduler;
|
||||
private final Generator generator;
|
||||
private final Parser parser;
|
||||
private final WebSocketPolicy policy;
|
||||
private final WebSocketBehavior behavior;
|
||||
private final AtomicBoolean suspendToken;
|
||||
private final FrameFlusher flusher;
|
||||
|
@ -233,6 +234,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
endp.getLocalAddress().getPort(),
|
||||
endp.getRemoteAddress().getAddress().getHostAddress(),
|
||||
endp.getRemoteAddress().getPort());
|
||||
this.policy = policy;
|
||||
this.behavior = policy.getBehavior();
|
||||
this.bufferPool = bufferPool;
|
||||
this.generator = new Generator(policy,bufferPool);
|
||||
|
@ -389,7 +391,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
return parser;
|
||||
}
|
||||
|
||||
|
||||
public WebSocketPolicy getPolicy()
|
||||
{
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress()
|
||||
{
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
|||
public class StringMessageSink implements MessageSink
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(StringMessageSink.class);
|
||||
private final WebSocketPolicy policy;
|
||||
private WebSocketPolicy policy;
|
||||
private final Function<String, Void> onMessageFunction;
|
||||
private Utf8StringBuilder utf;
|
||||
private int size = 0;
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
|
|||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
public class LocalWebSocketConnection implements LogicalConnection, IncomingFrames, ConnectionStateListener
|
||||
|
@ -44,7 +45,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
|
|||
private final String id;
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final Executor executor;
|
||||
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
|
||||
private WebSocketPolicy policy;
|
||||
private IncomingFrames incoming;
|
||||
private IOState ioState = new IOState();
|
||||
|
||||
|
@ -59,6 +60,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
|
|||
this.bufferPool = bufferPool;
|
||||
this.executor = new ExecutorThreadPool();
|
||||
this.ioState.addListener(this);
|
||||
this.policy = WebSocketPolicy.newServerPolicy();
|
||||
}
|
||||
|
||||
public LocalWebSocketConnection(TestName testname, ByteBufferPool bufferPool)
|
||||
|
@ -66,6 +68,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
|
|||
this(testname.getMethodName(),bufferPool);
|
||||
}
|
||||
|
||||
public LocalWebSocketConnection(TestName testname, WebSocketContainerScope containerScope)
|
||||
{
|
||||
this(testname.getMethodName(), containerScope.getBufferPool());
|
||||
this.policy = containerScope.getPolicy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor()
|
||||
{
|
||||
|
@ -141,7 +149,13 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
|
|||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public WebSocketPolicy getPolicy()
|
||||
{
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress()
|
||||
{
|
||||
|
|
|
@ -34,8 +34,7 @@ public class LocalWebSocketSession extends WebSocketSession
|
|||
public LocalWebSocketSession(WebSocketContainerScope containerScope, TestName testname, Object websocket)
|
||||
{
|
||||
super(containerScope,URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),websocket,
|
||||
containerScope.getPolicy(),
|
||||
new LocalWebSocketConnection(testname,containerScope.getBufferPool()));
|
||||
new LocalWebSocketConnection(testname,containerScope));
|
||||
this.id = testname.getMethodName();
|
||||
outgoingCapture = new OutgoingFramesCapture();
|
||||
setOutgoingHandler(outgoingCapture);
|
||||
|
|
|
@ -82,7 +82,7 @@ public class MessageWriterTest
|
|||
remoteSocket = new TrackingSocket("remote");
|
||||
URI remoteURI = new URI("ws://localhost/remote");
|
||||
LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(bufferPool);
|
||||
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,policy,remoteConnection);
|
||||
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,remoteConnection);
|
||||
OutgoingFrames socketPipe = FramePipes.to(remoteSession);
|
||||
remoteSession.start();
|
||||
remoteSession.open();
|
||||
|
@ -91,7 +91,7 @@ public class MessageWriterTest
|
|||
TrackingSocket localSocket = new TrackingSocket("local");
|
||||
URI localURI = new URI("ws://localhost/local");
|
||||
LocalWebSocketConnection localConnection = new LocalWebSocketConnection(bufferPool);
|
||||
session = new WebSocketSession(containerScope,localURI,localSocket,policy,localConnection);
|
||||
session = new WebSocketSession(containerScope,localURI,localSocket,localConnection);
|
||||
|
||||
// talk to our remote socket
|
||||
session.setOutgoingHandler(socketPipe);
|
||||
|
|
|
@ -108,7 +108,13 @@ public class DummyConnection implements LogicalConnection
|
|||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public WebSocketPolicy getPolicy()
|
||||
{
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress()
|
||||
{
|
||||
|
|
|
@ -255,7 +255,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
|
|||
{
|
||||
try
|
||||
{
|
||||
return impl.createSession(requestURI, websocket, containerPolicy, connection);
|
||||
return impl.createSession(requestURI, websocket, connection);
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
|
@ -583,7 +583,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
|
|||
// Setup websocket connection
|
||||
AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, getPolicy().clonePolicy(), bufferPool);
|
||||
|
||||
extensionStack.setPolicy(containerPolicy);
|
||||
extensionStack.setPolicy(wsConnection.getPolicy());
|
||||
extensionStack.configure(wsConnection.getParser());
|
||||
extensionStack.configure(wsConnection.getGenerator());
|
||||
|
||||
|
|
Loading…
Reference in New Issue