Issue #207 - Support javax.websocket version 1.1

This commit is contained in:
Joakim Erdfelt 2016-08-10 17:18:18 -07:00
parent 1f196f5276
commit 357fae18ae
13 changed files with 703 additions and 385 deletions

View File

@ -18,16 +18,16 @@
package org.eclipse.jetty.websocket.jsr356.decoders;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.PongMessage;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.common.InvalidSignatureException;
@ -35,25 +35,68 @@ import org.eclipse.jetty.websocket.common.util.ReflectUtils;
public class AvailableDecoders implements Predicate<Class<?>>
{
private static class RegisteredDecoder implements Predicate<Class<?>>
public static class RegisteredDecoder
{
public final Class<? extends Decoder> decoder;
public final Class<? extends Decoder> interfaceType;
public final Class<?> objectType;
public RegisteredDecoder(Class<? extends Decoder> decoder, Class<?> objectType)
public RegisteredDecoder(Class<? extends Decoder> decoder, Class<? extends Decoder> interfaceType, Class<?> objectType)
{
this.decoder = decoder;
this.interfaceType = interfaceType;
this.objectType = objectType;
}
@Override
public boolean test(Class<?> type)
public boolean implementsInterface(Class<? extends Decoder> type)
{
return interfaceType.isAssignableFrom(type);
}
public boolean isType(Class<?> type)
{
return objectType.isAssignableFrom(type);
}
}
private List<RegisteredDecoder> registeredDecoders;
private LinkedList<RegisteredDecoder> registeredDecoders;
public AvailableDecoders()
{
registeredDecoders = new LinkedList<>();
// TEXT based [via Class reference]
register(BooleanDecoder.class, Decoder.Text.class, Boolean.class);
register(ByteDecoder.class, Decoder.Text.class, Byte.class);
register(CharacterDecoder.class, Decoder.Text.class, Character.class);
register(DoubleDecoder.class, Decoder.Text.class, Double.class);
register(FloatDecoder.class, Decoder.Text.class, Float.class);
register(IntegerDecoder.class, Decoder.Text.class, Integer.class);
register(LongDecoder.class, Decoder.Text.class, Long.class);
register(StringDecoder.class, Decoder.Text.class, String.class);
// TEXT based [via Primitive reference]
register(BooleanDecoder.class, Decoder.Text.class, Boolean.TYPE);
register(ByteDecoder.class, Decoder.Text.class, Byte.TYPE);
register(CharacterDecoder.class, Decoder.Text.class, Character.TYPE);
register(DoubleDecoder.class, Decoder.Text.class, Double.TYPE);
register(FloatDecoder.class, Decoder.Text.class, Float.TYPE);
register(IntegerDecoder.class, Decoder.Text.class, Integer.TYPE);
register(LongDecoder.class, Decoder.Text.class, Long.TYPE);
// BINARY based
register(ByteBufferDecoder.class, Decoder.Binary.class, ByteBuffer.class);
register(ByteArrayDecoder.class, Decoder.Binary.class, byte[].class);
// STREAMING based
register(ReaderDecoder.class, Decoder.TextStream.class, Reader.class);
register(InputStreamDecoder.class, Decoder.BinaryStream.class, InputStreamDecoder.class);
}
private void register(Class<? extends Decoder> decoderClass, Class<? extends Decoder> interfaceType, Class<?> type)
{
registeredDecoders.add(new RegisteredDecoder(decoderClass, interfaceType, type));
}
public void register(Class<? extends Decoder> decoder)
{
@ -113,7 +156,7 @@ public class AvailableDecoders implements Predicate<Class<?>>
decoders.forEach(this::register);
}
private void add(Class<? extends Decoder> decoder, Class<?> interfaceClass)
private void add(Class<? extends Decoder> decoder, Class<? extends Decoder> interfaceClass)
{
Class<?> objectType = ReflectUtils.findGenericClassFor(decoder, interfaceClass);
if (objectType == null)
@ -126,58 +169,31 @@ public class AvailableDecoders implements Predicate<Class<?>>
throw new InvalidWebSocketException(err.toString());
}
if (registeredDecoders == null)
registeredDecoders = new ArrayList<>();
registeredDecoders.add(new RegisteredDecoder(decoder, interfaceClass, objectType));
}
registeredDecoders.add(new RegisteredDecoder(decoder, objectType));
public List<RegisteredDecoder> supporting(Class<? extends Decoder> interfaceType)
{
return registeredDecoders.stream()
.filter(registered -> registered.implementsInterface(interfaceType))
.collect(Collectors.toList());
}
public Class<? extends Decoder> getDecoderFor(Class<?> type)
{
// Check registered decoders first
if (registeredDecoders != null)
try
{
for (RegisteredDecoder registered : registeredDecoders)
return registeredDecoders.stream()
.filter(registered -> registered.isType(type))
.findFirst()
.get()
.decoder;
}
catch (NoSuchElementException e)
{
if (registered.objectType.isAssignableFrom(type))
return registered.decoder;
}
}
// Check default decoders next
// TEXT based [via Class reference]
if (Boolean.class.isAssignableFrom(type)) return BooleanDecoder.class;
if (Byte.class.isAssignableFrom(type)) return ByteDecoder.class;
if (Character.class.isAssignableFrom(type)) return CharacterDecoder.class;
if (Double.class.isAssignableFrom(type)) return DoubleDecoder.class;
if (Float.class.isAssignableFrom(type)) return FloatDecoder.class;
if (Integer.class.isAssignableFrom(type)) return IntegerDecoder.class;
if (Long.class.isAssignableFrom(type)) return LongDecoder.class;
if (String.class.isAssignableFrom(type)) return StringDecoder.class;
// TEXT based [via Primitive reference]
if (Boolean.TYPE.isAssignableFrom(type)) return BooleanDecoder.class;
if (Byte.TYPE.isAssignableFrom(type)) return ByteDecoder.class;
if (Character.TYPE.isAssignableFrom(type)) return CharacterDecoder.class;
if (Double.TYPE.isAssignableFrom(type)) return DoubleDecoder.class;
if (Float.TYPE.isAssignableFrom(type)) return FloatDecoder.class;
if (Integer.TYPE.isAssignableFrom(type)) return IntegerDecoder.class;
if (Long.TYPE.isAssignableFrom(type)) return LongDecoder.class;
// BINARY based
if (ByteBuffer.class.isAssignableFrom(type)) return ByteBufferDecoder.class;
if (byte[].class.isAssignableFrom(type)) return ByteArrayDecoder.class;
// PONG based
if (PongMessage.class.isAssignableFrom(type)) return PongMessageDecoder.class;
// STREAMING based
if (Reader.class.isAssignableFrom(type)) return ReaderDecoder.class;
if (InputStream.class.isAssignableFrom(type)) return InputStreamDecoder.class;
throw new InvalidWebSocketException("No Decoder found for type " + type);
}
}
public static Object decodePrimitive(String value, Class<?> type) throws DecodeException
{
@ -264,55 +280,9 @@ public class AvailableDecoders implements Predicate<Class<?>>
@Override
public boolean test(Class<?> type)
{
if (registeredDecoders != null)
{
for (RegisteredDecoder registered : registeredDecoders)
{
if (registered.test(type))
return true;
}
}
// TEXT based [via Class references]
if (Boolean.class.isAssignableFrom(type) ||
Byte.class.isAssignableFrom(type) ||
Character.class.isAssignableFrom(type) ||
Double.class.isAssignableFrom(type) ||
Float.class.isAssignableFrom(type) ||
Integer.class.isAssignableFrom(type) ||
Long.class.isAssignableFrom(type) ||
String.class.isAssignableFrom(type) ||
Reader.class.isAssignableFrom(type))
{
return true;
}
// TEXT based [via Primitive reference]
if (Boolean.TYPE.isAssignableFrom(type) ||
Byte.TYPE.isAssignableFrom(type) ||
Character.TYPE.isAssignableFrom(type) ||
Double.TYPE.isAssignableFrom(type) ||
Float.TYPE.isAssignableFrom(type) ||
Integer.TYPE.isAssignableFrom(type) ||
Long.TYPE.isAssignableFrom(type))
{
return true;
}
// BINARY based
if (ByteBuffer.class.isAssignableFrom(type) ||
byte[].class.isAssignableFrom(type) ||
InputStream.class.isAssignableFrom(type))
{
return true;
}
// PONG based
if (PongMessage.class.isAssignableFrom(type))
{
return true;
}
return false;
return registeredDecoders.stream()
.filter(registered -> registered.isType(type))
.findFirst()
.isPresent();
}
}

View File

@ -20,16 +20,19 @@ package org.eclipse.jetty.websocket.jsr356.function;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
@ -45,12 +48,17 @@ import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.InvalidSignatureException;
import org.eclipse.jetty.websocket.common.function.CommonEndpointFunctions;
import org.eclipse.jetty.websocket.common.message.PartialBinaryMessageSink;
import org.eclipse.jetty.websocket.common.message.PartialTextMessageSink;
import org.eclipse.jetty.websocket.common.reflect.Arg;
import org.eclipse.jetty.websocket.common.reflect.DynamicArgs;
import org.eclipse.jetty.websocket.common.reflect.UnorderedSignature;
import org.eclipse.jetty.websocket.common.util.ReflectUtils;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.jsr356.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.jsr356.messages.DecodedBinaryMessageSink;
import org.eclipse.jetty.websocket.jsr356.messages.DecodedTextMessageSink;
/**
* Endpoint Functions used as interface between from the parsed websocket frames
@ -116,13 +124,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
if (endpoint instanceof Endpoint)
{
Endpoint jsrEndpoint = (Endpoint) endpoint;
setOnOpen((session) -> {
setOnOpen((session) ->
{
jsrEndpoint.onOpen(session, endpointConfig);
return null;
},
ReflectUtils.findMethod(endpoint.getClass(), "onOpen", Session.class, EndpointConfig.class)
);
setOnClose((close) -> {
setOnClose((close) ->
{
CloseReason closeReason = new CloseReason(
CloseReason.CloseCodes.getCloseCode(close.getStatusCode())
, close.getReason());
@ -131,7 +141,8 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
},
ReflectUtils.findMethod(endpoint.getClass(), "onClose", Session.class, EndpointConfig.class)
);
setOnError((cause) -> {
setOnError((cause) ->
{
jsrEndpoint.onError(getSession(), cause);
return null;
},
@ -269,43 +280,170 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
// OnMessage [0..3] (TEXT / BINARY / PONG)
Method messageMethods[] = ReflectUtils.findAnnotatedMethods(endpointClass, OnMessage.class);
if (messageMethods != null && messageMethods.length > 0)
Method onMessages[] = ReflectUtils.findAnnotatedMethods(endpointClass, OnMessage.class);
if (onMessages != null && onMessages.length > 0)
{
for (Method messageMethod : messageMethods)
for (Method onMsg : onMessages)
{
// Analyze @OnMessage method declaration
// Must be a public, non-static method
ReflectUtils.assertIsPublicNonStatic(method);
ReflectUtils.assertIsPublicNonStatic(onMsg);
// If a return type is declared, it must be capable
// of being encoded with an available Encoder
Class<?> returnType = messageMethod.getReturnType();
Encoder returnEncoder = newEncoderFor(returnType);
Class<?> returnType = onMsg.getReturnType();
Encoder returnEncoder = getEncoderFor(returnType);
// Try to determine Message type (BINARY / TEXT / PONG) from signature
// Try to determine Message type (BINARY / TEXT) from signature
// Test for Whole TEXT
DynamicArgs.Builder builder = createDynamicArgs(
new Arg(Session.class),
new Arg(CloseReason.class));
Arg SESSION = new Arg(Session.class);
DynamicArgs.Signature sig = builder.getMatchingSignature(method);
if(sig != null)
// Whole TEXT ---
for (AvailableDecoders.RegisteredDecoder decoder : decoders.supporting(Decoder.Text.class))
{
UnorderedSignature sig = new UnorderedSignature(createCallArgs(SESSION, new Arg(decoder.objectType).required()));
if (sig.test(onMsg))
{
final Object[] args = newCallArgs(sig.getCallArgs());
args[0] = getSession();
BiFunction<Object,Object[],Object> invoker = sig.newFunction(onMsg);
Decoder.Text decoderInstance = getDecoderInstance(decoder, Decoder.Text.class);
DecodedTextMessageSink textSink = new DecodedTextMessageSink(
getSession(),
decoderInstance,
(msg) ->
{
args[1] = msg;
return invoker.apply(endpoint, args);
}
);
setOnText(textSink, onMsg);
break;
}
}
// Test for Whole BINARY
// Whole BINARY ---
for(AvailableDecoders.RegisteredDecoder decoder : decoders.supporting(Decoder.Binary.class))
{
UnorderedSignature sig = new UnorderedSignature(createCallArgs(SESSION, new Arg(decoder.objectType).required()));
if (sig.test(onMsg))
{
final Object[] args = newCallArgs(sig.getCallArgs());
args[0] = getSession();
BiFunction<Object,Object[],Object> invoker = sig.newFunction(onMsg);
Decoder.Binary decoderInstance = getDecoderInstance(decoder, Decoder.Binary.class);
DecodedBinaryMessageSink binarySink = new DecodedBinaryMessageSink(
getSession(),
decoderInstance,
(msg) ->
{
args[1] = msg;
return invoker.apply(endpoint, args);
}
);
setOnBinary(binarySink, onMsg);
break;
}
}
// Test for Partial TEXT
// Partial Text ---
Arg ARG_PARTIAL_BOOL = new Arg(boolean.class).required();
Arg ARG_STRING = new Arg(String.class).required();
DynamicArgs.Builder partialTextBuilder = new DynamicArgs.Builder();
partialTextBuilder.addSignature(createCallArgs(SESSION, ARG_STRING, ARG_PARTIAL_BOOL));
// Test for Partial BINARY
DynamicArgs.Signature sigPartialText = partialTextBuilder.getMatchingSignature(onMsg);
if (sigPartialText != null)
{
// Found partial text args
final Object[] args = newCallArgs(sigPartialText.getCallArgs());
// Test for Streaming TEXT
args[0] = getSession();
DynamicArgs invoker = partialTextBuilder.build(method, sigPartialText);
setOnText(new PartialTextMessageSink((partial) ->
{
args[1] = partial.getPayload();
args[2] = partial.isFin();
invoker.invoke(endpoint, args);
return null;
}), onMsg);
}
// Test for Streaming BINARY
// Partial Binary ---
Arg ARG_BYTE_ARRAY = new Arg(byte[].class).required();
Arg ARG_BYTE_BUFFER = new Arg(ByteBuffer.class).required();
DynamicArgs.Builder partialBinaryBuilder = new DynamicArgs.Builder();
partialBinaryBuilder.addSignature(createCallArgs(SESSION, ARG_BYTE_ARRAY, ARG_PARTIAL_BOOL));
partialBinaryBuilder.addSignature(createCallArgs(SESSION, ARG_BYTE_BUFFER, ARG_PARTIAL_BOOL));
DynamicArgs.Signature sigPartialBinary = partialBinaryBuilder.getMatchingSignature(onMsg);
if (sigPartialBinary != null)
{
// Found partial binary args
assertPartialMustHaveVoidReturn(onMsg);
final Object[] args = newCallArgs(sigPartialBinary.getCallArgs());
args[0] = getSession();
DynamicArgs invoker = partialBinaryBuilder.build(method, sigPartialBinary);
setOnBinary(new PartialBinaryMessageSink((partial) ->
{
args[1] = partial.getPayload();
// TODO: handle byte[] version
args[2] = partial.isFin();
invoker.invoke(endpoint, args);
return null;
}), onMsg);
}
// Streaming TEXT ---
DynamicArgs.Builder streamingTextBuilder = new DynamicArgs.Builder();
decoders.supporting(Decoder.TextStream.class)
.forEach(registeredDecoder ->
streamingTextBuilder.addSignature(
createCallArgs(
SESSION, new Arg(registeredDecoder.objectType).required()
)
)
);
DynamicArgs.Signature sigStreamingText = streamingTextBuilder.getMatchingSignature(onMsg);
if (sigStreamingText != null)
{
// TODO: Found streaming text args
assertPartialMustHaveVoidReturn(onMsg);
final Object[] args = newCallArgs(sigStreamingText.getCallArgs());
}
// Streaming BINARY ---
DynamicArgs.Builder streamingBinaryBuilder = new DynamicArgs.Builder();
decoders.supporting(Decoder.BinaryStream.class)
.forEach(registeredDecoder ->
streamingBinaryBuilder.addSignature(
createCallArgs(
SESSION, new Arg(registeredDecoder.objectType).required()
)
)
);
DynamicArgs.Signature sigStreamingBinary = streamingBinaryBuilder.getMatchingSignature(onMsg);
if (sigStreamingBinary != null)
{
// TODO: found streaming binary args!
final Object[] args = newCallArgs(sigStreamingBinary.getCallArgs());
DynamicArgs invoker = streamingBinaryBuilder.build(method, sigStreamingBinary);
/*
setOnBinary(new DecodedBinaryMessageSink<String>(
(msg) ->
{
args[0] = getSession();
args[1] = msg;
invoker.invoke(endpoint, args);
return null;
}), onMsg);
*/
}
// Test for PONG
@ -329,13 +467,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
}
private Encoder newEncoderFor(Class<?> type)
private Encoder getEncoderFor(Class<?> type)
{
if ((type == Void.TYPE) || (type == Void.class))
{
return null;
}
// TODO: return a pre-initialized encoder from past call
Class<? extends Encoder> encoderClass = encoders.getEncoderFor(type);
if (encoderClass == null)
{
@ -354,6 +494,23 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
}
private <T extends Decoder> T getDecoderInstance(AvailableDecoders.RegisteredDecoder registeredDecoder, Class<T> interfaceType)
{
// TODO: Return previous instantiated decoders here
Class<? extends Decoder> decoderClass = registeredDecoder.decoder;
try
{
Decoder decoder = decoderClass.newInstance();
decoder.init(this.endpointConfig);
return (T) decoder;
}
catch (Throwable t)
{
throw new InvalidWebSocketException("Unable to initialize required Decoder: " + decoderClass.getName(), t);
}
}
private void assertSignatureValid(DynamicArgs.Signature sig, Class<? extends Annotation> annotationClass, Method method)
{
if (sig != null)
@ -366,7 +523,25 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
throw new InvalidSignatureException(err.toString());
}
private Object[] newCallArgs(Arg[] callArgs) throws DecodeException
private void assertPartialMustHaveVoidReturn(Method method)
{
if (method.getReturnType().isAssignableFrom(Void.TYPE))
{
return;
}
StringBuilder err = new StringBuilder();
err.append("Partial @OnMessage handlers must be void return type: ");
ReflectUtils.append(err, endpoint.getClass(), method);
throw new InvalidSignatureException(err.toString());
}
public AvailableDecoders getAvailableDecoders()
{
return decoders;
}
protected Object[] newCallArgs(Arg[] callArgs) throws DecodeException
{
int len = callArgs.length;
Object[] args = new Object[callArgs.length];
@ -397,6 +572,13 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
private DynamicArgs.Builder createDynamicArgs(Arg... args)
{
DynamicArgs.Builder argBuilder = new DynamicArgs.Builder();
Arg[] callArgs = createCallArgs(args);
argBuilder.addSignature(callArgs);
return argBuilder;
}
protected Arg[] createCallArgs(Arg... args)
{
int argCount = args.length;
if (this.staticArgs != null)
argCount += this.staticArgs.size();
@ -418,9 +600,6 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
callArgs[idx++] = new Arg(staticArg.value.getClass()).setTag(staticArg.name);
}
}
argBuilder.addSignature(callArgs);
return argBuilder;
return callArgs;
}
}

View File

@ -24,38 +24,35 @@ import java.util.function.Function;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.common.InvalidSignatureException;
import org.eclipse.jetty.websocket.common.reflect.Arg;
import org.eclipse.jetty.websocket.common.reflect.DynamicArgs;
import org.eclipse.jetty.websocket.common.util.ReflectUtils;
/**
* javax.websocket {@link OnMessage} method {@link Function} for TEXT/{@link String} types
*/
@Deprecated
public class JsrOnTextFunction implements Function<String, Void>
public class JsrOnTextFunction<T> implements Function<T, Object>
{
/* private static final DynamicArgs.Builder ARGBUILDER;
private static final int SESSION = 1;
private static final int TEXT = 2;
private static final DynamicArgs.Builder ARGBUILDER;
private static final Arg SESSION = new Arg(Session.class);
private static final Arg TEXT = new Arg(String.class).required();
static
{
ARGBUILDER = new DynamicArgs.Builder();
ARGBUILDER.addSignature(new ExactSignature(String.class).indexedAs(TEXT));
ARGBUILDER.addSignature(new ExactSignature(Session.class,String.class).indexedAs(SESSION,TEXT));
}
public static DynamicArgs.Builder getDynamicArgsBuilder()
{
return ARGBUILDER;
ARGBUILDER.addSignature(SESSION, TEXT);
}
public static boolean hasMatchingSignature(Method method)
{
return ARGBUILDER.hasMatchingSignature(method);
}*/
}
private final Session session;
private final Object endpoint;
private final Method method;
private final DynamicArgs callable;
public JsrOnTextFunction(Session session, Object endpoint, Method method)
{
@ -67,26 +64,16 @@ public class JsrOnTextFunction implements Function<String, Void>
ReflectUtils.assertIsPublicNonStatic(method);
ReflectUtils.assertIsReturn(method, Void.TYPE);
/*this.callable = ARGBUILDER.build(method);
this.callable = ARGBUILDER.build(method, SESSION, TEXT);
if (this.callable == null)
{
throw InvalidSignatureException.build(method, OnMessage.class, ARGBUILDER);
}
this.callable.setArgReferences(SESSION,TEXT);*/
}
@Override
public Void apply(String text)
public Object apply(T text)
{
/*Object args[] = this.callable.toArgs(session,text);
try
{
method.invoke(endpoint,args);
}
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e)
{
throw new WebSocketException("Unable to call text message method " + ReflectUtils.toString(endpoint.getClass(),method),e);
}*/
return null;
return this.callable.invoke(endpoint, session, text);
}
}

View File

@ -0,0 +1,55 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.function;
import java.lang.reflect.Method;
import javax.websocket.Decoder;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.common.reflect.Arg;
import org.eclipse.jetty.websocket.common.reflect.DynamicArgs;
/**
* Possible Text Endpoint Functions
*/
public class JsrWholeTextEndpoint
{
private final Arg SESSION = new Arg(Session.class);
private DynamicArgs.Builder ARGBUILDER;
public JsrWholeTextEndpoint(JsrEndpointFunctions jsrFunctions)
{
ARGBUILDER = new DynamicArgs.Builder();
jsrFunctions.getAvailableDecoders().supporting(Decoder.Text.class)
.forEach(registeredDecoder ->
ARGBUILDER.addSignature(
jsrFunctions.createCallArgs(
SESSION, new Arg(registeredDecoder.objectType).required()
)
)
);
}
public DynamicArgs.Signature getMatchingSignature(Method onMsg)
{
return ARGBUILDER.getMatchingSignature(onMsg);
}
}

View File

@ -0,0 +1,61 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.messages;
import java.io.IOException;
import java.util.function.Function;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EncodeException;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.common.message.ByteBufferMessageSink;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
public class DecodedBinaryMessageSink extends ByteBufferMessageSink
{
public DecodedBinaryMessageSink(JsrSession session, Decoder.Binary decoder, Function<Object, Object> onMessageFunction)
{
super(session.getPolicy(), (byteBuf) ->
{
try
{
Object decoded = null;
decoded = decoder.decode(byteBuf);
// notify event
Object ret = onMessageFunction.apply(decoded);
if (ret != null)
{
// send response
session.getBasicRemote().sendObject(ret);
}
return null;
}
catch (DecodeException | EncodeException | IOException e)
{
throw new WebSocketException(e);
}
});
}
}

View File

@ -0,0 +1,59 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.messages;
import java.io.IOException;
import java.util.function.Function;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EncodeException;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.common.message.StringMessageSink;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
public class DecodedTextMessageSink extends StringMessageSink
{
public DecodedTextMessageSink(JsrSession session, Decoder.Text decoder, Function<Object, Object> onMessageFunction)
{
super(session.getPolicy(), (message) ->
{
try
{
Object decoded = decoder.decode(message);
// notify event
Object ret = onMessageFunction.apply(decoded);
if (ret != null)
{
// send response
session.getBasicRemote().sendObject(ret);
}
return null;
}
catch (DecodeException | EncodeException | IOException e)
{
throw new WebSocketException(e);
}
});
}
}

View File

@ -18,13 +18,16 @@
package org.eclipse.jetty.websocket.jsr356.function;
import java.lang.reflect.InvocationTargetException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
@ -71,7 +74,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
return new JsrSession(container, id, requestURI, ei, connection);
}
private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws InvocationTargetException, IllegalAccessException
private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception
{
JsrEndpointFunctions endpointFunctions = new JsrEndpointFunctions(
socket, container.getPolicy(),
@ -81,6 +84,9 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
uriParams,
endpointConfig
);
endpointFunctions.start();
assertThat("Has BinarySink", endpointFunctions.hasBinarySink(), is(true));
// This invocation is the same for all tests
ByteBuffer byteBuffer = ByteBuffer.wrap("Hello World".getBytes(StandardCharsets.UTF_8));
@ -89,9 +95,11 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
socket.assertEvent(String.format(expectedEventFormat, args));
}
@ClientEndpoint
public static class MessageSocket extends TrackingSocket
{
// TODO: Ambiguous declaration
// Invalid OnMessage - mandatory type (TEXT/BINARY) missing
@SuppressWarnings("IncorrectOnMessageMethodsInspection")
@OnMessage
public void onMessage()
{
@ -100,29 +108,31 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
}
@Test
public void testInvokeMessage() throws InvocationTargetException, IllegalAccessException
public void testInvokeMessage() throws Exception
{
assertOnMessageInvocation(new MessageSocket(), "onMessage()");
}
public static class MessageTextSocket extends TrackingSocket
@ClientEndpoint
public static class MessageByteBufferSocket extends TrackingSocket
{
@OnMessage
public void onMessage(String msg)
public void onMessage(ByteBuffer msg)
{
addEvent("onMessage(%s)", msg);
addEvent("onMessage(%s)", BufferUtil.toUTF8String(msg));
}
}
@Test
public void testInvokeMessageText() throws InvocationTargetException, IllegalAccessException
public void testInvokeMessageByteBuffer() throws Exception
{
assertOnMessageInvocation(new MessageTextSocket(), "onMessage(Hello World)");
assertOnMessageInvocation(new MessageByteBufferSocket(), "onMessage(Hello World)");
}
@ClientEndpoint
public static class MessageSessionSocket extends TrackingSocket
{
// TODO: Ambiguous declaration
// Invalid OnMessage - mandatory type (TEXT/BINARY) missing
@OnMessage
public void onMessage(Session session)
{
@ -131,28 +141,28 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
}
@Test
public void testInvokeMessageSession() throws InvocationTargetException, IllegalAccessException
public void testInvokeMessageSession() throws Exception
{
assertOnMessageInvocation(new MessageSessionSocket(),
"onMessage(JsrSession[CLIENT,%s,DummyConnection])",
MessageSessionSocket.class.getName());
}
public static class MessageSessionTextSocket extends TrackingSocket
@ClientEndpoint
public static class MessageSessionByteBufferSocket extends TrackingSocket
{
@OnMessage
public void onMessage(Session session, String msg)
public void onMessage(Session session, ByteBuffer msg)
{
addEvent("onMessage(%s, %s)", session, msg);
addEvent("onMessage(%s, %s)", session, BufferUtil.toUTF8String(msg));
}
}
@Test
public void testInvokeMessageSessionText() throws InvocationTargetException, IllegalAccessException
public void testInvokeMessageSessionByteBuffer() throws Exception
{
assertOnMessageInvocation(new MessageSessionTextSocket(),
assertOnMessageInvocation(new MessageSessionByteBufferSocket(),
"onMessage(JsrSession[CLIENT,%s,DummyConnection], Hello World)",
MessageSessionTextSocket.class.getName());
MessageSessionByteBufferSocket.class.getName());
}
}

View File

@ -114,19 +114,22 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
if (endpoint instanceof WebSocketConnectionListener)
{
WebSocketConnectionListener listener = (WebSocketConnectionListener) endpoint;
setOnOpen((session) -> {
setOnOpen((session) ->
{
listener.onWebSocketConnect(session);
return null;
},
ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketConnect", Session.class)
);
setOnClose((close) -> {
setOnClose((close) ->
{
listener.onWebSocketClose(close.getStatusCode(), close.getReason());
return null;
},
ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketClose", int.class, String.class)
);
setOnError((cause) -> {
setOnError((cause) ->
{
listener.onWebSocketError(cause);
return null;
},
@ -139,12 +142,14 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
{
WebSocketListener listener = (WebSocketListener) endpoint;
setOnText(new StringMessageSink(policy, (payload) -> {
setOnText(new StringMessageSink(policy, (payload) ->
{
listener.onWebSocketText(payload);
return null;
}),
ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketText", String.class));
setOnBinary(new ByteArrayMessageSink(policy, (payload) -> {
setOnBinary(new ByteArrayMessageSink(policy, (payload) ->
{
listener.onWebSocketBinary(payload, 0, payload.length);
return null;
}),
@ -156,7 +161,8 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
if (endpoint instanceof WebSocketPingPongListener)
{
WebSocketPingPongListener listener = (WebSocketPingPongListener) endpoint;
setOnPong((pong) -> {
setOnPong((pong) ->
{
ByteBuffer payload = pong;
if (pong == null)
payload = BufferUtil.EMPTY_BUFFER;
@ -164,7 +170,8 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
return null;
},
ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketPong", ByteBuffer.class));
setOnPing((ping) -> {
setOnPing((ping) ->
{
ByteBuffer payload = ping;
if (ping == null)
payload = BufferUtil.EMPTY_BUFFER;
@ -179,12 +186,14 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
if (endpoint instanceof WebSocketPartialListener)
{
WebSocketPartialListener listener = (WebSocketPartialListener) endpoint;
setOnText(new PartialTextMessageSink((partial) -> {
setOnText(new PartialTextMessageSink((partial) ->
{
listener.onWebSocketPartialText(partial.getPayload(), partial.isFin());
return null;
}),
ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketPartialText", String.class, boolean.class));
setOnBinary(new PartialBinaryMessageSink((partial) -> {
setOnBinary(new PartialBinaryMessageSink((partial) ->
{
listener.onWebSocketPartialBinary(partial.getPayload(), partial.isFin());
return null;
}),
@ -196,7 +205,8 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
if (endpoint instanceof WebSocketFrameListener)
{
WebSocketFrameListener listener = (WebSocketFrameListener) endpoint;
setOnFrame((frame) -> {
setOnFrame((frame) ->
{
listener.onWebSocketFrame(new ReadOnlyDelegatedFrame(frame));
return null;
},
@ -385,6 +395,16 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
}
}
public boolean hasBinarySink()
{
return this.onBinarySink != null;
}
public boolean hasTextSink()
{
return this.onTextSink != null;
}
private String describeOrigin(Object obj)
{
if (obj == null)

View File

@ -18,58 +18,20 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
public class ByteBufferMessageSink implements MessageSink
public class ByteBufferMessageSink extends ByteArrayMessageSink
{
private static final int BUFFER_SIZE = 65535;
private final WebSocketPolicy policy;
private final Function<ByteBuffer, Void> onMessageFunction;
private ByteArrayOutputStream out;
private int size;
public ByteBufferMessageSink(WebSocketPolicy policy, Function<ByteBuffer, Void> onMessageFunction)
{
this.policy = policy;
this.onMessageFunction = onMessageFunction;
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
super(policy, (byteArray) ->
{
try
{
if (payload != null)
{
policy.assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload,out);
}
}
catch (IOException e)
{
throw new RuntimeException("Unable to append Binary Message", e);
}
finally
{
if (fin)
{
ByteBuffer bbuf = ByteBuffer.wrap(out.toByteArray());
ByteBuffer bbuf = ByteBuffer.wrap(byteArray);
onMessageFunction.apply(bbuf);
// reset
out = null;
size = 0;
}
}
return null;
});
}
}

View File

@ -43,6 +43,7 @@ public class StringMessageSink implements MessageSink
this.size = 0;
}
@SuppressWarnings("Duplicates")
@Override
public void accept(ByteBuffer payload, Boolean fin)
{

View File

@ -166,7 +166,7 @@ public class DynamicArgs
*/
private final BiFunction<Object, Object[], Object> invoker;
private DynamicArgs(BiFunction<Object, Object[], Object> invoker)
public DynamicArgs(BiFunction<Object, Object[], Object> invoker)
{
this.invoker = invoker;
}

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.reflect.DynamicArgs.Signature;
import org.eclipse.jetty.websocket.common.util.ReflectUtils;
class UnorderedSignature implements Signature, Predicate<Method>
public class UnorderedSignature implements Signature, Predicate<Method>
{
private class SelectedArg extends Arg
{
@ -52,17 +52,17 @@ class UnorderedSignature implements Signature, Predicate<Method>
}
private final static Logger LOG = Log.getLogger(UnorderedSignature.class);
private final Arg[] params;
private final Arg[] callArgs;
public UnorderedSignature(Arg... args)
{
this.params = args;
this.callArgs = args;
}
@Override
public Arg[] getCallArgs()
{
return this.params;
return this.callArgs;
}
@Override
@ -74,14 +74,14 @@ class UnorderedSignature implements Signature, Predicate<Method>
@Override
public boolean test(Method method)
{
return getArgMapping(method, false, params) != null;
return getArgMapping(method, false, callArgs) != null;
}
public void appendDescription(StringBuilder str)
{
str.append('(');
boolean delim = false;
for (Arg arg : params)
for (Arg arg : callArgs)
{
if (delim)
{
@ -229,6 +229,20 @@ class UnorderedSignature implements Signature, Predicate<Method>
return new UnorderedParamsFunction(method, argMapping);
}
/**
* Generate BiFunction for this signature.
*
* @param method the method to get the invoker function for
* @return BiFunction of Endpoint Object, Call Args, Return Type
*/
public BiFunction<Object, Object[], Object> newFunction(Method method)
{
int argMapping[] = getArgMapping(method, true, this.callArgs);
// Return function capable of calling method
return new UnorderedParamsFunction(method, argMapping);
}
public static class UnorderedParamsFunction
implements BiFunction<Object, Object[], Object>
{