Fixes #117 - working build of WebSocketClient with HttpClient

+ Proxy support exists now
This commit is contained in:
Joakim Erdfelt 2016-12-01 15:00:41 -07:00
parent 646a411746
commit b70101b93f
22 changed files with 116 additions and 101 deletions

View File

@ -125,7 +125,7 @@ public class ClientContainer extends ContainerLifeCycle implements WebSocketCont
ClientEndpointConfig config = (ClientEndpointConfig)instance.getConfig();
ClientUpgradeRequest req = new ClientUpgradeRequest();
UpgradeListener upgradeListener = null;
for (Extension ext : config.getExtensions())
{
req.addExtensions(new JsrExtensionConfig(ext));

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.jsr356;
import java.net.HttpCookie;
import java.util.List;
import java.util.Map;
@ -49,21 +48,7 @@ public class JsrUpgradeListener implements UpgradeListener
configurator.beforeRequest(headers);
// Handle cookies
for (String name : headers.keySet())
{
if ("cookie".equalsIgnoreCase(name))
{
List<String> values = headers.get(name);
if (values != null)
{
for (String cookie : values)
{
List<HttpCookie> cookies = HttpCookie.parse(cookie);
request.getCookies().addAll(cookies);
}
}
}
}
request.setHeaders(headers);
}
@Override

View File

@ -85,6 +85,8 @@ public abstract class AnnotatedEndpointMetadata<T extends Annotation, C extends
private final Class<?> endpointClass;
private DecoderMetadataSet decoders;
private EncoderMetadataSet encoders;
private long maxTextMessageSize = -1;
private long maxBinaryMessageSize = -1;
protected AnnotatedEndpointMetadata(Class<?> endpointClass)
{
@ -119,7 +121,19 @@ public abstract class AnnotatedEndpointMetadata<T extends Annotation, C extends
public abstract T getAnnotation();
public abstract C getConfig();
@Override
public long maxBinaryMessageSize()
{
return maxBinaryMessageSize;
}
@Override
public long maxTextMessageSize()
{
return maxTextMessageSize;
}
@Override
public DecoderMetadataSet getDecoders()
{
@ -137,4 +151,14 @@ public abstract class AnnotatedEndpointMetadata<T extends Annotation, C extends
{
return endpointClass;
}
public void setMaxBinaryMessageSize(long maxBinaryMessageSize)
{
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
public void setMaxTextMessageSize(long maxTextMessageSize)
{
this.maxTextMessageSize = maxTextMessageSize;
}
}

View File

@ -134,20 +134,25 @@ public class AnnotatedEndpointScanner<T extends Annotation, C extends EndpointCo
OnMessageCallable onmessage = new OnMessageCallable(pojo,method);
visitMethod(onmessage,pojo,method,paramsOnMessage,OnMessage.class);
OnMessage messageAnno = (OnMessage) annotation;
Param param = onmessage.getMessageObjectParam();
switch (param.role)
{
case MESSAGE_BINARY:
metadata.onBinary = new OnMessageBinaryCallable(onmessage);
metadata.setMaxBinaryMessageSize(messageAnno.maxMessageSize());
break;
case MESSAGE_BINARY_STREAM:
metadata.onBinaryStream = new OnMessageBinaryStreamCallable(onmessage);
metadata.setMaxBinaryMessageSize(messageAnno.maxMessageSize());
break;
case MESSAGE_TEXT:
metadata.onText = new OnMessageTextCallable(onmessage);
metadata.setMaxTextMessageSize(messageAnno.maxMessageSize());
break;
case MESSAGE_TEXT_STREAM:
metadata.onTextStream = new OnMessageTextStreamCallable(onmessage);
metadata.setMaxTextMessageSize(messageAnno.maxMessageSize());
break;
case MESSAGE_PONG:
metadata.onPong = new OnMessagePongCallable(onmessage);

View File

@ -21,13 +21,11 @@ package org.eclipse.jetty.websocket.jsr356.client;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.DeploymentException;
import javax.websocket.OnMessage;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverImpl;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
import org.eclipse.jetty.websocket.jsr356.annotations.OnMessageCallable;
import org.eclipse.jetty.websocket.jsr356.endpoints.EndpointInstance;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrAnnotatedEventDriver;
@ -49,8 +47,8 @@ public class JsrClientEndpointImpl implements EventDriverImpl
JsrEvents<ClientEndpoint, ClientEndpointConfig> events = new JsrEvents<>(metadata);
// Handle @OnMessage maxMessageSizes
int maxBinaryMessage = getMaxMessageSize(policy.getMaxBinaryMessageSize(),metadata.onBinary,metadata.onBinaryStream);
int maxTextMessage = getMaxMessageSize(policy.getMaxTextMessageSize(),metadata.onText,metadata.onTextStream);
int maxBinaryMessage = getMaxMessageSize(policy.getMaxBinaryMessageSize(),metadata.maxBinaryMessageSize());
int maxTextMessage = getMaxMessageSize(policy.getMaxTextMessageSize(),metadata.maxTextMessageSize());
policy.setMaxBinaryMessageSize(maxBinaryMessage);
policy.setMaxTextMessageSize(maxTextMessage);
@ -64,23 +62,11 @@ public class JsrClientEndpointImpl implements EventDriverImpl
return "class is annotated with @" + ClientEndpoint.class.getName();
}
private int getMaxMessageSize(int defaultMaxMessageSize, OnMessageCallable... onMessages)
private int getMaxMessageSize(int defaultMaxMessageSize, long maxMessageSize)
{
for (OnMessageCallable callable : onMessages)
if (maxMessageSize >= 1)
{
if (callable == null)
{
continue;
}
OnMessage onMsg = callable.getMethod().getAnnotation(OnMessage.class);
if (onMsg == null)
{
continue;
}
if (onMsg.maxMessageSize() > 0)
{
return (int)onMsg.maxMessageSize();
}
return (int) maxMessageSize;
}
return defaultMaxMessageSize;
}

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata;
/**
* Base implementation for JSR-356 Annotated event drivers.
@ -54,6 +55,13 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
{
super(policy,endpointInstance);
this.events = events;
EndpointMetadata metadata = endpointInstance.getMetadata();
if (metadata.maxTextMessageSize() >= 1)
policy.setMaxTextMessageSize((int) metadata.maxTextMessageSize());
if (metadata.maxBinaryMessageSize() >= 1)
policy.setMaxBinaryMessageSize((int) metadata.maxBinaryMessageSize());
}
@Override

View File

@ -20,9 +20,13 @@ package org.eclipse.jetty.websocket.jsr356.metadata;
public interface EndpointMetadata
{
public DecoderMetadataSet getDecoders();
DecoderMetadataSet getDecoders();
public EncoderMetadataSet getEncoders();
public Class<?> getEndpointClass();
EncoderMetadataSet getEncoders();
Class<?> getEndpointClass();
default long maxTextMessageSize() { return -1; }
default long maxBinaryMessageSize() { return -1; }
}

View File

@ -18,6 +18,10 @@
package org.eclipse.jetty.websocket.jsr356;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.net.HttpCookie;
import java.net.URI;
import java.util.Collections;
@ -73,14 +77,15 @@ public class CookiesTest
final String cookieName = "name";
final String cookieValue = "value";
final String cookieString = cookieName + "=" + cookieValue;
startServer(new EchoHandler()
{
@Override
public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response)
{
List<HttpCookie> cookies = request.getCookies();
Assert.assertNotNull(cookies);
Assert.assertEquals(1, cookies.size());
assertThat("Cookies", cookies, notNullValue());
assertThat("Cookies", cookies.size(), is(1));
HttpCookie cookie = cookies.get(0);
Assert.assertEquals(cookieName, cookie.getName());
Assert.assertEquals(cookieValue, cookie.getValue());

View File

@ -42,12 +42,7 @@ public class AnnotatedRuntimeOnOpen
@OnOpen
public void onOpen(Session session, EndpointConfig config)
{
// Intentional runtime exception.
int[] arr = new int[5];
for (int i = 0; i < 10; i++)
{
arr[i] = 222;
}
throw new RuntimeException("Intentionally Misbehaving");
}
@OnClose

View File

@ -38,12 +38,7 @@ public class EndpointRuntimeOnOpen extends Endpoint
@Override
public void onOpen(Session session, EndpointConfig config)
{
// Intentional runtime exception.
int[] arr = new int[5];
for (int i = 0; i < 10; i++)
{
arr[i] = 222;
}
throw new RuntimeException("Intentionally Misbehaving");
}
@Override

View File

@ -22,11 +22,11 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.server.Server;
@ -37,14 +37,20 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.jsr356.EchoHandler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class MisbehavingClassTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static Server server;
private static EchoHandler handler;
private static URI serverUri;
@SuppressWarnings("Duplicates")
@BeforeClass
public static void startServer() throws Exception
{
@ -84,6 +90,7 @@ public class MisbehavingClassTest
}
}
@SuppressWarnings("Duplicates")
@Test
public void testEndpointRuntimeOnOpen() throws Exception
{
@ -92,19 +99,20 @@ public class MisbehavingClassTest
try (StacklessLogging logging = new StacklessLogging(EndpointRuntimeOnOpen.class, WebSocketSession.class))
{
// expecting ArrayIndexOutOfBoundsException during onOpen
Session session = container.connectToServer(socket,serverUri);
// expecting IOException during onOpen
expectedException.expect(IOException.class);
expectedException.expectCause(instanceOf(RuntimeException.class));
container.connectToServer(socket, serverUri);
expectedException.reportMissingExceptionWithMessage("Should have failed .connectToServer()");
assertThat("Close should have occurred",socket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
// technically, the session object isn't invalid here.
assertThat("Session.isOpen",session.isOpen(),is(false));
assertThat("Should have only had 1 error",socket.errors.size(),is(1));
Throwable cause = socket.errors.pop();
assertThat("Error",cause,instanceOf(ArrayIndexOutOfBoundsException.class));
assertThat("Error",cause,instanceOf(RuntimeException.class));
}
}
@SuppressWarnings("Duplicates")
@Test
public void testAnnotatedRuntimeOnOpen() throws Exception
{
@ -113,14 +121,14 @@ public class MisbehavingClassTest
try (StacklessLogging logging = new StacklessLogging(AnnotatedRuntimeOnOpen.class, WebSocketSession.class))
{
// expecting ArrayIndexOutOfBoundsException during onOpen
Session session = container.connectToServer(socket,serverUri);
// expecting IOException during onOpen
expectedException.expect(IOException.class);
expectedException.expectCause(instanceOf(RuntimeException.class));
container.connectToServer(socket, serverUri);
expectedException.reportMissingExceptionWithMessage("Should have failed .connectToServer()");
assertThat("Close should have occurred",socket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
// technically, the session object isn't invalid here.
assertThat("Session.isOpen",session.isOpen(),is(false));
assertThat("Should have only had 1 error",socket.errors.size(),is(1));
Throwable cause = socket.errors.pop();
assertThat("Error",cause,instanceOf(ArrayIndexOutOfBoundsException.class));
}

View File

@ -47,7 +47,7 @@ public class AnnotatedServerEndpointMetadata extends AnnotatedEndpointMetadata<S
this.endpoint = anno;
this.config = new AnnotatedServerEndpointConfig(containerScope,websocket,anno,baseConfig);
getDecoders().addAll(anno.decoders());
getDecoders().addAll(anno.decoders());
getEncoders().addAll(anno.encoders());
}

View File

@ -273,7 +273,7 @@ public class ConfiguratorTest
{
List<String> selectedProtocol = response.getHeaders().get("Sec-WebSocket-Protocol");
String protocol = "<>";
if (selectedProtocol != null || !selectedProtocol.isEmpty())
if (selectedProtocol != null && !selectedProtocol.isEmpty())
protocol = selectedProtocol.get(0);
config.getUserProperties().put("selected-subprotocol", protocol);
}

View File

@ -146,9 +146,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*
* @param sslContextFactory
* ssl context factory to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/
@Deprecated
public WebSocketClient(SslContextFactory sslContextFactory)
{
this(sslContextFactory,null);
@ -257,7 +255,12 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
*/
public WebSocketClient(WebSocketContainerScope scope, EventDriverFactory eventDriverFactory, SessionFactory sessionFactory)
{
this.httpClient = new HttpClient(scope.getSslContextFactory());
SslContextFactory sslContextFactory = scope.getSslContextFactory();
if(sslContextFactory == null)
{
sslContextFactory = new SslContextFactory();
}
this.httpClient = new HttpClient(sslContextFactory);
this.httpClient.setExecutor(scope.getExecutor());
addBean(this.httpClient);

View File

@ -469,6 +469,11 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
header(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL,protocol);
}
}
if (upgradeListener != null)
{
upgradeListener.onHandshakeRequest(apiRequestFacade);
}
}
@Override
@ -550,11 +555,6 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
throw new HttpResponseException("Not WebSocket Upgrade",response);
}
if (upgradeListener != null)
{
upgradeListener.onHandshakeRequest(apiRequestFacade);
}
// Check the Accept hash
String reqKey = this.getHeaders().get(HttpHeader.SEC_WEBSOCKET_KEY);
String expectedHash = AcceptHash.hashKey(reqKey);

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
*/
public interface UpgradeListener
{
public void onHandshakeRequest(UpgradeRequest request);
void onHandshakeRequest(UpgradeRequest request);
public void onHandshakeResponse(UpgradeResponse response);
void onHandshakeResponse(UpgradeResponse response);
}

View File

@ -98,7 +98,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
this.outgoingHandler = connection;
this.incomingHandler = websocket;
this.connection.getIOState().addListener(this);
this.policy = containerScope.getPolicy();
this.policy = websocket.getPolicy();
addBean(this.connection);
addBean(this.websocket);
@ -350,16 +350,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
}
/**
* Incoming Errors from Parser
* Incoming Errors
*/
@Override
public void incomingError(Throwable t)
{
if (connection.getIOState().isInputAvailable())
{
// Forward Errors to User WebSocket Object
websocket.incomingError(t);
}
// Forward Errors to User WebSocket Object
websocket.incomingError(t);
}
/**
@ -418,6 +415,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
public void notifyError(Throwable cause)
{
if (openFuture != null && !openFuture.isDone())
openFuture.completeExceptionally(cause);
incomingError(cause);
}

View File

@ -234,7 +234,7 @@ public abstract class AbstractEventDriver extends AbstractLifeCycle implements I
}
catch (Throwable t)
{
unhandled(t);
this.session.notifyError(t);
throw t;
}
}

View File

@ -55,7 +55,7 @@ public class SimpleContainerScope extends ContainerLifeCycle implements WebSocke
this.objectFactory = objectFactory;
QueuedThreadPool threadPool = new QueuedThreadPool();
String name = "WebSocketSimpleContainer@" + hashCode();
String name = SimpleContainerScope.class.getSimpleName() + ".Executor@" + hashCode();
threadPool.setName(name);
threadPool.setDaemon(true);
this.executor = threadPool;

View File

@ -18,6 +18,10 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -30,23 +34,17 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule;
import org.eclipse.jetty.websocket.common.util.Sha1Sum;
import org.eclipse.jetty.websocket.server.helper.CaptureSocket;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class)
public class PerMessageDeflateExtensionTest
{
@ -83,9 +81,6 @@ public class PerMessageDeflateExtensionTest
return modes;
}
@Rule
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
private SimpleServletServer server;
private String scheme;
private int msgSize;
@ -132,7 +127,7 @@ public class PerMessageDeflateExtensionTest
serverPolicy.setMaxBinaryMessageSize(binBufferSize);
serverPolicy.setMaxBinaryMessageBufferSize(binBufferSize);
WebSocketClient client = new WebSocketClient(server.getSslContextFactory(),null,bufferPool);
WebSocketClient client = new WebSocketClient(server.getSslContextFactory());
WebSocketPolicy clientPolicy = client.getPolicy();
clientPolicy.setMaxBinaryMessageSize(binBufferSize);
clientPolicy.setMaxBinaryMessageBufferSize(binBufferSize);

View File

@ -324,6 +324,7 @@ public class WebSocketCloseTest
}
}
@SuppressWarnings("Duplicates")
private void fastClose() throws Exception
{
try (IBlockheadClient client = new BlockheadClient(server.getServerUri()))
@ -376,6 +377,7 @@ public class WebSocketCloseTest
}
}
@SuppressWarnings("Duplicates")
private void dropConnection() throws Exception
{
try (IBlockheadClient client = new BlockheadClient(server.getServerUri()))

View File

@ -90,6 +90,7 @@ public class ServletUpgradeRequest implements UpgradeRequest
request.complete();
}
@SuppressWarnings("unused")
public X509Certificate[] getCertificates()
{
return (X509Certificate[])request.getAttribute("javax.servlet.request.X509Certificate");