Fixes #388 - Provide pluggable RemoteEndpoint service

This commit is contained in:
Joakim Erdfelt 2016-06-06 07:24:59 -07:00
parent d6bbe9ab12
commit 8758cbb607
15 changed files with 154 additions and 83 deletions

View File

@ -33,8 +33,8 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.jsr356.EchoHandler;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrEndpointEventDriver;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -90,7 +90,7 @@ public class MisbehavingClassTest
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
EndpointRuntimeOnOpen socket = new EndpointRuntimeOnOpen();
try (StacklessLogging logging = new StacklessLogging(EndpointRuntimeOnOpen.class,JsrEndpointEventDriver.class))
try (StacklessLogging logging = new StacklessLogging(EndpointRuntimeOnOpen.class, WebSocketSession.class))
{
// expecting ArrayIndexOutOfBoundsException during onOpen
Session session = container.connectToServer(socket,serverUri);
@ -111,7 +111,7 @@ public class MisbehavingClassTest
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
AnnotatedRuntimeOnOpen socket = new AnnotatedRuntimeOnOpen();
try (StacklessLogging logging = new StacklessLogging(AnnotatedRuntimeOnOpen.class))
try (StacklessLogging logging = new StacklessLogging(AnnotatedRuntimeOnOpen.class, WebSocketSession.class))
{
// expecting ArrayIndexOutOfBoundsException during onOpen
Session session = container.connectToServer(socket,serverUri);

View File

@ -88,6 +88,7 @@ public class OnPartialTest
@SuppressWarnings("resource")
JsrSession session = new JsrSession(container,id,requestURI,driver,connection);
session.setPolicy(policy);
session.start();
session.open();
return driver;
}

View File

@ -6,8 +6,8 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
org.eclipse.jetty.websocket.jsr356.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
# org.eclipse.jetty.websocket.jsr356.LEVEL=DEBUG
### Show state changes on BrowserDebugTool
# -- LEAVE THIS AT DEBUG LEVEL --

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
@ -160,6 +161,13 @@ public interface RemoteEndpoint
* @see #flush()
*/
void setBatchMode(BatchMode mode);
/**
* Get the InetSocketAddress for the established connection.
*
* @return the InetSocketAddress for the established connection. (or null, if the connection is no longer established)
*/
InetSocketAddress getInetSocketAddress();
/**
* Flushes messages that may have been batched by the implementation.

View File

@ -19,9 +19,10 @@
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
public interface RemoteEndpointFactory
{
WebSocketRemoteEndpoint newRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoingFrames, BatchMode batchMode);
RemoteEndpoint newRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoingFrames, BatchMode batchMode);
}

View File

@ -216,9 +216,15 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
}
/**
* Get the InetSocketAddress for the established connection.
*
* @return the InetSocketAddress for the established connection. (or null, if the connection is no longer established)
*/
public InetSocketAddress getInetSocketAddress()
{
if(connection == null)
return null;
return connection.getRemoteAddress();
}

View File

@ -75,7 +75,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private RemoteEndpointFactory remoteEndpointFactory;
private String protocolVersion;
private Map<String, String[]> parameterMap = new HashMap<>();
private WebSocketRemoteEndpoint remote;
private RemoteEndpoint remote;
private IncomingFrames incomingHandler;
private OutgoingFrames outgoingHandler;
private WebSocketPolicy policy;

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.PingFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.CloseableLocalWebSocketSession;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -67,12 +68,12 @@ public class EventDriverTest
}
@Test
public void testAdapter_ConnectClose() throws IOException
public void testAdapter_ConnectClose() throws Exception
{
AdapterConnectCloseSocket socket = new AdapterConnectCloseSocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(container,testname,driver))
try (LocalWebSocketSession conn = new CloseableLocalWebSocketSession(container,testname,driver))
{
conn.open();
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
@ -84,12 +85,12 @@ public class EventDriverTest
}
@Test
public void testAnnotated_ByteArray() throws IOException
public void testAnnotated_ByteArray() throws Exception
{
AnnotatedBinaryArraySocket socket = new AnnotatedBinaryArraySocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(container,testname,driver))
try (LocalWebSocketSession conn = new CloseableLocalWebSocketSession(container,testname,driver))
{
conn.open();
driver.incomingFrame(makeBinaryFrame("Hello World",true));
@ -103,12 +104,12 @@ public class EventDriverTest
}
@Test
public void testAnnotated_Error() throws IOException
public void testAnnotated_Error() throws Exception
{
AnnotatedTextSocket socket = new AnnotatedTextSocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(container,testname,driver))
try (LocalWebSocketSession conn = new CloseableLocalWebSocketSession(container,testname,driver))
{
conn.open();
driver.incomingError(new WebSocketException("oof"));
@ -122,12 +123,12 @@ public class EventDriverTest
}
@Test
public void testAnnotated_Frames() throws IOException
public void testAnnotated_Frames() throws Exception
{
AnnotatedFramesSocket socket = new AnnotatedFramesSocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(container,testname,driver))
try (LocalWebSocketSession conn = new CloseableLocalWebSocketSession(container,testname,driver))
{
conn.open();
driver.incomingFrame(new PingFrame().setPayload("PING"));
@ -151,7 +152,7 @@ public class EventDriverTest
AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(container,testname,driver))
try (LocalWebSocketSession conn = new CloseableLocalWebSocketSession(container,testname,driver))
{
conn.open();
driver.incomingFrame(makeBinaryFrame("Hello World",true));
@ -170,7 +171,7 @@ public class EventDriverTest
ListenerBasicSocket socket = new ListenerBasicSocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(container,testname,driver))
try (LocalWebSocketSession conn = new CloseableLocalWebSocketSession(container,testname,driver))
{
conn.start();
conn.open();

View File

@ -0,0 +1,57 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.junit.rules.TestName;
public class CloseableLocalWebSocketSession extends LocalWebSocketSession implements AutoCloseable
{
public CloseableLocalWebSocketSession(WebSocketContainerScope containerScope, TestName testname, EventDriver driver)
{
super(containerScope, testname, driver);
// LifeCycle start
try
{
start();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
@Override
public void close()
{
// WebSocketSession.close();
super.close();
// LifeCycle Stop
try
{
stop();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}

View File

@ -61,13 +61,14 @@ public class MessageOutputStreamTest
private LocalWebSocketSession session;
@After
public void closeSession()
public void closeSession() throws Exception
{
session.close();
session.stop();
}
@Before
public void setupSession()
public void setupSession() throws Exception
{
policy = WebSocketPolicy.newServerPolicy();
policy.setInputBufferSize(1024);
@ -91,6 +92,8 @@ public class MessageOutputStreamTest
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// start session
session.start();
// open connection
session.open();
}

View File

@ -59,13 +59,14 @@ public class MessageWriterTest
private LocalWebSocketSession session;
@After
public void closeSession()
public void closeSession() throws Exception
{
session.close();
session.stop();
}
@Before
public void setupSession()
public void setupSession() throws Exception
{
policy = WebSocketPolicy.newServerPolicy();
policy.setInputBufferSize(1024);
@ -89,6 +90,8 @@ public class MessageWriterTest
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// start session
session.start();
// open connection
session.open();
}

View File

@ -18,8 +18,9 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.Collection;
@ -38,7 +39,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
@ -257,7 +257,7 @@ public class WebSocketCloseTest
{
client.setProtocols("fastfail");
client.setTimeout(1,TimeUnit.SECONDS);
try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
try (StacklessLogging scope = new StacklessLogging(FastFailSocket.class, WebSocketSession.class))
{
client.connect();
client.sendStandardRequest();

View File

@ -39,11 +39,7 @@ public class AnnotatedRuntimeOnConnectSocket
public void onWebSocketConnect(Session sess)
{
// Intentional runtime exception.
int[] arr = new int[5];
for (int i = 0; i < 10; i++)
{
arr[i] = 222;
}
throw new RuntimeException("Intentional Exception from onWebSocketConnect");
}
@OnWebSocketClose

View File

@ -37,11 +37,7 @@ public class ListenerRuntimeOnConnectSocket extends WebSocketAdapter
super.onWebSocketConnect(sess);
// Intentional runtime exception.
int[] arr = new int[5];
for (int i = 0; i < 10; i++)
{
arr[i] = 222;
}
throw new RuntimeException("Intentional Exception from onWebSocketConnect");
}
@Override

View File

@ -18,8 +18,9 @@
package org.eclipse.jetty.websocket.server.misbehaving;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.concurrent.TimeUnit;
@ -29,7 +30,7 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
import org.eclipse.jetty.websocket.server.SimpleServletServer;
@ -63,72 +64,70 @@ public class MisbehavingClassTest
@Test
public void testListenerRuntimeOnConnect() throws Exception
{
try (IBlockheadClient client = new BlockheadClient(server.getServerUri()))
try (IBlockheadClient client = new BlockheadClient(server.getServerUri());
StacklessLogging scope = new StacklessLogging(ListenerRuntimeOnConnectSocket.class, WebSocketSession.class))
{
client.setProtocols("listener-runtime-connect");
client.setTimeout(1,TimeUnit.SECONDS);
try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
{
ListenerRuntimeOnConnectSocket socket = badSocketsServlet.listenerRuntimeConnect;
socket.reset();
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
ListenerRuntimeOnConnectSocket socket = badSocketsServlet.listenerRuntimeConnect;
socket.reset();
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
client.write(close.asFrame()); // respond with close
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
// ensure server socket got close event
assertThat("Close Latch",socket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
assertThat("closeStatusCode",socket.closeStatusCode,is(StatusCode.SERVER_ERROR));
client.write(close.asFrame()); // respond with close
// Validate errors
assertThat("socket.onErrors",socket.errors.size(),is(1));
Throwable cause = socket.errors.pop();
assertThat("Error type",cause,instanceOf(ArrayIndexOutOfBoundsException.class));
}
// ensure server socket got close event
assertThat("Close Latch",socket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
assertThat("closeStatusCode",socket.closeStatusCode,is(StatusCode.SERVER_ERROR));
// Validate errors
assertThat("socket.onErrors",socket.errors.size(),is(1));
Throwable cause = socket.errors.pop();
assertThat("Error type",cause,instanceOf(RuntimeException.class));
}
}
@Test
public void testAnnotatedRuntimeOnConnect() throws Exception
{
try (IBlockheadClient client = new BlockheadClient(server.getServerUri()))
try (IBlockheadClient client = new BlockheadClient(server.getServerUri());
StacklessLogging scope = new StacklessLogging(AnnotatedRuntimeOnConnectSocket.class, WebSocketSession.class))
{
client.setProtocols("annotated-runtime-connect");
client.setTimeout(1,TimeUnit.SECONDS);
try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
{
AnnotatedRuntimeOnConnectSocket socket = badSocketsServlet.annotatedRuntimeConnect;
socket.reset();
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
AnnotatedRuntimeOnConnectSocket socket = badSocketsServlet.annotatedRuntimeConnect;
socket.reset();
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
client.write(close.asFrame()); // respond with close
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
// ensure server socket got close event
assertThat("Close Latch",socket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
assertThat("closeStatusCode",socket.closeStatusCode,is(StatusCode.SERVER_ERROR));
client.write(close.asFrame()); // respond with close
// Validate errors
assertThat("socket.onErrors",socket.errors.size(),is(1));
Throwable cause = socket.errors.pop();
assertThat("Error type",cause,instanceOf(ArrayIndexOutOfBoundsException.class));
}
// ensure server socket got close event
assertThat("Close Latch",socket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
assertThat("closeStatusCode",socket.closeStatusCode,is(StatusCode.SERVER_ERROR));
// Validate errors
assertThat("socket.onErrors",socket.errors.size(),is(1));
Throwable cause = socket.errors.pop();
assertThat("Error type",cause,instanceOf(RuntimeException.class));
}
}
}