diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/decoders/AvailableDecoders.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/decoders/AvailableDecoders.java index 97a1bd964e9..54059be6805 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/decoders/AvailableDecoders.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/decoders/AvailableDecoders.java @@ -18,16 +18,16 @@ package org.eclipse.jetty.websocket.jsr356.decoders; -import java.io.InputStream; import java.io.Reader; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.NoSuchElementException; import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.websocket.DecodeException; import javax.websocket.Decoder; -import javax.websocket.PongMessage; import org.eclipse.jetty.websocket.api.InvalidWebSocketException; import org.eclipse.jetty.websocket.common.InvalidSignatureException; @@ -35,85 +35,128 @@ import org.eclipse.jetty.websocket.common.util.ReflectUtils; public class AvailableDecoders implements Predicate> { - private static class RegisteredDecoder implements Predicate> + public static class RegisteredDecoder { public final Class decoder; + public final Class interfaceType; public final Class objectType; - - public RegisteredDecoder(Class decoder, Class objectType) + + public RegisteredDecoder(Class decoder, Class interfaceType, Class objectType) { this.decoder = decoder; + this.interfaceType = interfaceType; this.objectType = objectType; } - - @Override - public boolean test(Class type) + + public boolean implementsInterface(Class type) + { + return interfaceType.isAssignableFrom(type); + } + + public boolean isType(Class type) { return objectType.isAssignableFrom(type); } } - - private List registeredDecoders; - + + private LinkedList registeredDecoders; + + public AvailableDecoders() + { + registeredDecoders = new LinkedList<>(); + + // TEXT based [via Class reference] + register(BooleanDecoder.class, Decoder.Text.class, Boolean.class); + register(ByteDecoder.class, Decoder.Text.class, Byte.class); + register(CharacterDecoder.class, Decoder.Text.class, Character.class); + register(DoubleDecoder.class, Decoder.Text.class, Double.class); + register(FloatDecoder.class, Decoder.Text.class, Float.class); + register(IntegerDecoder.class, Decoder.Text.class, Integer.class); + register(LongDecoder.class, Decoder.Text.class, Long.class); + register(StringDecoder.class, Decoder.Text.class, String.class); + + // TEXT based [via Primitive reference] + register(BooleanDecoder.class, Decoder.Text.class, Boolean.TYPE); + register(ByteDecoder.class, Decoder.Text.class, Byte.TYPE); + register(CharacterDecoder.class, Decoder.Text.class, Character.TYPE); + register(DoubleDecoder.class, Decoder.Text.class, Double.TYPE); + register(FloatDecoder.class, Decoder.Text.class, Float.TYPE); + register(IntegerDecoder.class, Decoder.Text.class, Integer.TYPE); + register(LongDecoder.class, Decoder.Text.class, Long.TYPE); + + // BINARY based + register(ByteBufferDecoder.class, Decoder.Binary.class, ByteBuffer.class); + register(ByteArrayDecoder.class, Decoder.Binary.class, byte[].class); + + // STREAMING based + register(ReaderDecoder.class, Decoder.TextStream.class, Reader.class); + register(InputStreamDecoder.class, Decoder.BinaryStream.class, InputStreamDecoder.class); + } + + private void register(Class decoderClass, Class interfaceType, Class type) + { + registeredDecoders.add(new RegisteredDecoder(decoderClass, interfaceType, type)); + } + public void register(Class decoder) { if (!ReflectUtils.isDefaultConstructable(decoder)) { throw new InvalidSignatureException("Decoder must have public, no-args constructor: " + decoder.getName()); } - + boolean foundDecoder = false; - + if (Decoder.Binary.class.isAssignableFrom(decoder)) { add(decoder, Decoder.Binary.class); foundDecoder = true; } - + if (Decoder.BinaryStream.class.isAssignableFrom(decoder)) { add(decoder, Decoder.BinaryStream.class); foundDecoder = true; } - + if (Decoder.Text.class.isAssignableFrom(decoder)) { add(decoder, Decoder.Text.class); foundDecoder = true; } - + if (Decoder.TextStream.class.isAssignableFrom(decoder)) { add(decoder, Decoder.TextStream.class); foundDecoder = true; } - + if (!foundDecoder) { throw new InvalidSignatureException("Not a valid Decoder class: " + decoder.getName() + " implements no " + Decoder.class.getName() + " interfaces"); } } - + public void registerAll(Class[] decoders) { if (decoders == null) return; - + for (Class decoder : decoders) { register(decoder); } } - + public void registerAll(List> decoders) { if (decoders == null) return; - + decoders.forEach(this::register); } - - private void add(Class decoder, Class interfaceClass) + + private void add(Class decoder, Class interfaceClass) { Class objectType = ReflectUtils.findGenericClassFor(decoder, interfaceClass); if (objectType == null) @@ -125,69 +168,42 @@ public class AvailableDecoders implements Predicate> err.append(decoder); throw new InvalidWebSocketException(err.toString()); } - - if (registeredDecoders == null) - registeredDecoders = new ArrayList<>(); - - registeredDecoders.add(new RegisteredDecoder(decoder, objectType)); + + registeredDecoders.add(new RegisteredDecoder(decoder, interfaceClass, objectType)); } - + + public List supporting(Class interfaceType) + { + return registeredDecoders.stream() + .filter(registered -> registered.implementsInterface(interfaceType)) + .collect(Collectors.toList()); + } + public Class getDecoderFor(Class type) { - // Check registered decoders first - if (registeredDecoders != null) + try { - for (RegisteredDecoder registered : registeredDecoders) - { - if (registered.objectType.isAssignableFrom(type)) - return registered.decoder; - } + return registeredDecoders.stream() + .filter(registered -> registered.isType(type)) + .findFirst() + .get() + .decoder; + } + catch (NoSuchElementException e) + { + throw new InvalidWebSocketException("No Decoder found for type " + type); } - - // Check default decoders next - - // TEXT based [via Class reference] - if (Boolean.class.isAssignableFrom(type)) return BooleanDecoder.class; - if (Byte.class.isAssignableFrom(type)) return ByteDecoder.class; - if (Character.class.isAssignableFrom(type)) return CharacterDecoder.class; - if (Double.class.isAssignableFrom(type)) return DoubleDecoder.class; - if (Float.class.isAssignableFrom(type)) return FloatDecoder.class; - if (Integer.class.isAssignableFrom(type)) return IntegerDecoder.class; - if (Long.class.isAssignableFrom(type)) return LongDecoder.class; - if (String.class.isAssignableFrom(type)) return StringDecoder.class; - - // TEXT based [via Primitive reference] - if (Boolean.TYPE.isAssignableFrom(type)) return BooleanDecoder.class; - if (Byte.TYPE.isAssignableFrom(type)) return ByteDecoder.class; - if (Character.TYPE.isAssignableFrom(type)) return CharacterDecoder.class; - if (Double.TYPE.isAssignableFrom(type)) return DoubleDecoder.class; - if (Float.TYPE.isAssignableFrom(type)) return FloatDecoder.class; - if (Integer.TYPE.isAssignableFrom(type)) return IntegerDecoder.class; - if (Long.TYPE.isAssignableFrom(type)) return LongDecoder.class; - - // BINARY based - if (ByteBuffer.class.isAssignableFrom(type)) return ByteBufferDecoder.class; - if (byte[].class.isAssignableFrom(type)) return ByteArrayDecoder.class; - - // PONG based - if (PongMessage.class.isAssignableFrom(type)) return PongMessageDecoder.class; - - // STREAMING based - if (Reader.class.isAssignableFrom(type)) return ReaderDecoder.class; - if (InputStream.class.isAssignableFrom(type)) return InputStreamDecoder.class; - - throw new InvalidWebSocketException("No Decoder found for type " + type); } - + public static Object decodePrimitive(String value, Class type) throws DecodeException { if (value == null) return null; - + // Simplest (and most common) form of @PathParam if (String.class.isAssignableFrom(type)) return value; - + try { // Per JSR356 spec, just the java primitives @@ -251,7 +267,7 @@ public class AvailableDecoders implements Predicate> { return Long.parseLong(value); } - + // Not a primitive! throw new DecodeException(value, "Not a recognized primitive type: " + type); } @@ -260,59 +276,13 @@ public class AvailableDecoders implements Predicate> throw new DecodeException(value, "Unable to decode as type " + type.getName()); } } - + @Override public boolean test(Class type) { - if (registeredDecoders != null) - { - for (RegisteredDecoder registered : registeredDecoders) - { - if (registered.test(type)) - return true; - } - } - - // TEXT based [via Class references] - if (Boolean.class.isAssignableFrom(type) || - Byte.class.isAssignableFrom(type) || - Character.class.isAssignableFrom(type) || - Double.class.isAssignableFrom(type) || - Float.class.isAssignableFrom(type) || - Integer.class.isAssignableFrom(type) || - Long.class.isAssignableFrom(type) || - String.class.isAssignableFrom(type) || - Reader.class.isAssignableFrom(type)) - { - return true; - } - - // TEXT based [via Primitive reference] - if (Boolean.TYPE.isAssignableFrom(type) || - Byte.TYPE.isAssignableFrom(type) || - Character.TYPE.isAssignableFrom(type) || - Double.TYPE.isAssignableFrom(type) || - Float.TYPE.isAssignableFrom(type) || - Integer.TYPE.isAssignableFrom(type) || - Long.TYPE.isAssignableFrom(type)) - { - return true; - } - - // BINARY based - if (ByteBuffer.class.isAssignableFrom(type) || - byte[].class.isAssignableFrom(type) || - InputStream.class.isAssignableFrom(type)) - { - return true; - } - - // PONG based - if (PongMessage.class.isAssignableFrom(type)) - { - return true; - } - - return false; + return registeredDecoders.stream() + .filter(registered -> registered.isType(type)) + .findFirst() + .isPresent(); } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions.java index 399dc5e5edd..1b057a28c38 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions.java @@ -20,16 +20,19 @@ package org.eclipse.jetty.websocket.jsr356.function; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.websocket.ClientEndpoint; import javax.websocket.CloseReason; import javax.websocket.DecodeException; +import javax.websocket.Decoder; import javax.websocket.Encoder; import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; @@ -45,12 +48,17 @@ import org.eclipse.jetty.websocket.api.InvalidWebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.common.InvalidSignatureException; import org.eclipse.jetty.websocket.common.function.CommonEndpointFunctions; +import org.eclipse.jetty.websocket.common.message.PartialBinaryMessageSink; +import org.eclipse.jetty.websocket.common.message.PartialTextMessageSink; import org.eclipse.jetty.websocket.common.reflect.Arg; import org.eclipse.jetty.websocket.common.reflect.DynamicArgs; +import org.eclipse.jetty.websocket.common.reflect.UnorderedSignature; import org.eclipse.jetty.websocket.common.util.ReflectUtils; import org.eclipse.jetty.websocket.jsr356.JsrSession; import org.eclipse.jetty.websocket.jsr356.decoders.AvailableDecoders; import org.eclipse.jetty.websocket.jsr356.encoders.AvailableEncoders; +import org.eclipse.jetty.websocket.jsr356.messages.DecodedBinaryMessageSink; +import org.eclipse.jetty.websocket.jsr356.messages.DecodedTextMessageSink; /** * Endpoint Functions used as interface between from the parsed websocket frames @@ -59,7 +67,7 @@ import org.eclipse.jetty.websocket.jsr356.encoders.AvailableEncoders; public class JsrEndpointFunctions extends CommonEndpointFunctions { private static final Logger LOG = Log.getLogger(JsrEndpointFunctions.class); - + /** * Represents a static value (as seen from a URI PathParam) *

@@ -72,25 +80,25 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions { public final String name; public final String value; - + public StaticArg(String name, String value) { this.name = name; this.value = value; } - + @Override public int compare(StaticArg o1, StaticArg o2) { return o1.name.compareTo(o2.name); } } - + private final AvailableEncoders encoders; private final AvailableDecoders decoders; private final EndpointConfig endpointConfig; private List staticArgs; - + public JsrEndpointFunctions(Object endpoint, WebSocketPolicy policy, Executor executor, AvailableEncoders encoders, AvailableDecoders decoders, Map uriParams, EndpointConfig endpointConfig) @@ -99,7 +107,7 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions this.encoders = encoders; this.decoders = decoders; this.endpointConfig = endpointConfig; - + if (uriParams != null) { this.staticArgs = new ArrayList<>(); @@ -109,20 +117,22 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions .collect(Collectors.toList())); } } - + @Override protected void discoverEndpointFunctions(Object endpoint) { if (endpoint instanceof Endpoint) { Endpoint jsrEndpoint = (Endpoint) endpoint; - setOnOpen((session) -> { + setOnOpen((session) -> + { jsrEndpoint.onOpen(session, endpointConfig); return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onOpen", Session.class, EndpointConfig.class) ); - setOnClose((close) -> { + setOnClose((close) -> + { CloseReason closeReason = new CloseReason( CloseReason.CloseCodes.getCloseCode(close.getStatusCode()) , close.getReason()); @@ -131,22 +141,23 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions }, ReflectUtils.findMethod(endpoint.getClass(), "onClose", Session.class, EndpointConfig.class) ); - setOnError((cause) -> { + setOnError((cause) -> + { jsrEndpoint.onError(getSession(), cause); return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onError", Session.class, EndpointConfig.class) ); - + // If using an Endpoint, there's nothing else left to map at this point. // Eventually, the endpoint should call .addMessageHandler() to declare // the various TEXT / BINARY / PONG message functions return; } - + discoverAnnotatedEndpointFunctions(endpoint); } - + /** * Generic discovery of annotated endpoint functions. * @@ -155,15 +166,15 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions protected void discoverAnnotatedEndpointFunctions(Object endpoint) { Class endpointClass = endpoint.getClass(); - + // Use the JSR/Client annotation ClientEndpoint websocket = endpointClass.getAnnotation(ClientEndpoint.class); - + if (websocket != null) { encoders.registerAll(websocket.encoders()); decoders.registerAll(websocket.decoders()); - + // From here, the discovery of endpoint method is standard across // both JSR356/Client and JSR356/Server endpoints try @@ -176,7 +187,7 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions } } } - + /** * JSR356 Specific discovery of Annotated Endpoint Methods * @@ -186,22 +197,22 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions { Class endpointClass = endpoint.getClass(); Method method = null; - + // OnOpen [0..1] method = ReflectUtils.findAnnotatedMethod(endpointClass, OnOpen.class); if (method != null) { ReflectUtils.assertIsPublicNonStatic(method); ReflectUtils.assertIsReturn(method, Void.TYPE); - + // Analyze @OnOpen method declaration techniques DynamicArgs.Builder builder = createDynamicArgs( new Arg(Session.class), new Arg(EndpointConfig.class)); - + DynamicArgs.Signature sig = builder.getMatchingSignature(method); assertSignatureValid(sig, OnOpen.class, method); - + final Object[] args = newCallArgs(sig.getCallArgs()); DynamicArgs invoker = builder.build(method, sig); setOnOpen((jsrSession) -> @@ -212,22 +223,22 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions return null; }, method); } - + // OnClose [0..1] method = ReflectUtils.findAnnotatedMethod(endpointClass, OnClose.class); if (method != null) { ReflectUtils.assertIsPublicNonStatic(method); ReflectUtils.assertIsReturn(method, Void.TYPE); - + // Analyze @OnClose method declaration techniques DynamicArgs.Builder builder = createDynamicArgs( new Arg(Session.class), new Arg(CloseReason.class)); - + DynamicArgs.Signature sig = builder.getMatchingSignature(method); assertSignatureValid(sig, OnClose.class, method); - + final Object[] args = newCallArgs(sig.getCallArgs()); DynamicArgs invoker = builder.build(method, sig); setOnClose((closeInfo) -> @@ -241,22 +252,22 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions return null; }, method); } - + // OnError [0..1] method = ReflectUtils.findAnnotatedMethod(endpointClass, OnError.class); if (method != null) { ReflectUtils.assertIsPublicNonStatic(method); ReflectUtils.assertIsReturn(method, Void.TYPE); - + // Analyze @OnError method declaration techniques DynamicArgs.Builder builder = createDynamicArgs( new Arg(Session.class), new Arg(Throwable.class)); - + DynamicArgs.Signature sig = builder.getMatchingSignature(method); assertSignatureValid(sig, OnError.class, method); - + final Object[] args = newCallArgs(sig.getCallArgs()); DynamicArgs invoker = builder.build(method, sig); setOnError((cause) -> @@ -267,48 +278,175 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions return null; }, method); } - + // OnMessage [0..3] (TEXT / BINARY / PONG) - Method messageMethods[] = ReflectUtils.findAnnotatedMethods(endpointClass, OnMessage.class); - if (messageMethods != null && messageMethods.length > 0) + Method onMessages[] = ReflectUtils.findAnnotatedMethods(endpointClass, OnMessage.class); + if (onMessages != null && onMessages.length > 0) { - for (Method messageMethod : messageMethods) + for (Method onMsg : onMessages) { // Analyze @OnMessage method declaration - + // Must be a public, non-static method - ReflectUtils.assertIsPublicNonStatic(method); - + ReflectUtils.assertIsPublicNonStatic(onMsg); + // If a return type is declared, it must be capable // of being encoded with an available Encoder - Class returnType = messageMethod.getReturnType(); - Encoder returnEncoder = newEncoderFor(returnType); - - // Try to determine Message type (BINARY / TEXT / PONG) from signature - - // Test for Whole TEXT - DynamicArgs.Builder builder = createDynamicArgs( - new Arg(Session.class), - new Arg(CloseReason.class)); - - DynamicArgs.Signature sig = builder.getMatchingSignature(method); - if(sig != null) + Class returnType = onMsg.getReturnType(); + Encoder returnEncoder = getEncoderFor(returnType); + + // Try to determine Message type (BINARY / TEXT) from signature + + Arg SESSION = new Arg(Session.class); + + // Whole TEXT --- + for (AvailableDecoders.RegisteredDecoder decoder : decoders.supporting(Decoder.Text.class)) { - + UnorderedSignature sig = new UnorderedSignature(createCallArgs(SESSION, new Arg(decoder.objectType).required())); + if (sig.test(onMsg)) + { + final Object[] args = newCallArgs(sig.getCallArgs()); + args[0] = getSession(); + BiFunction invoker = sig.newFunction(onMsg); + Decoder.Text decoderInstance = getDecoderInstance(decoder, Decoder.Text.class); + DecodedTextMessageSink textSink = new DecodedTextMessageSink( + getSession(), + decoderInstance, + (msg) -> + { + args[1] = msg; + return invoker.apply(endpoint, args); + } + ); + setOnText(textSink, onMsg); + break; + } } - - // Test for Whole BINARY - - // Test for Partial TEXT - - // Test for Partial BINARY - - // Test for Streaming TEXT - - // Test for Streaming BINARY - + + // Whole BINARY --- + for(AvailableDecoders.RegisteredDecoder decoder : decoders.supporting(Decoder.Binary.class)) + { + UnorderedSignature sig = new UnorderedSignature(createCallArgs(SESSION, new Arg(decoder.objectType).required())); + if (sig.test(onMsg)) + { + final Object[] args = newCallArgs(sig.getCallArgs()); + args[0] = getSession(); + BiFunction invoker = sig.newFunction(onMsg); + Decoder.Binary decoderInstance = getDecoderInstance(decoder, Decoder.Binary.class); + DecodedBinaryMessageSink binarySink = new DecodedBinaryMessageSink( + getSession(), + decoderInstance, + (msg) -> + { + args[1] = msg; + return invoker.apply(endpoint, args); + } + ); + setOnBinary(binarySink, onMsg); + break; + } + } + + // Partial Text --- + Arg ARG_PARTIAL_BOOL = new Arg(boolean.class).required(); + Arg ARG_STRING = new Arg(String.class).required(); + DynamicArgs.Builder partialTextBuilder = new DynamicArgs.Builder(); + partialTextBuilder.addSignature(createCallArgs(SESSION, ARG_STRING, ARG_PARTIAL_BOOL)); + + DynamicArgs.Signature sigPartialText = partialTextBuilder.getMatchingSignature(onMsg); + if (sigPartialText != null) + { + // Found partial text args + final Object[] args = newCallArgs(sigPartialText.getCallArgs()); + + args[0] = getSession(); + DynamicArgs invoker = partialTextBuilder.build(method, sigPartialText); + setOnText(new PartialTextMessageSink((partial) -> + { + args[1] = partial.getPayload(); + args[2] = partial.isFin(); + invoker.invoke(endpoint, args); + return null; + }), onMsg); + } + + // Partial Binary --- + Arg ARG_BYTE_ARRAY = new Arg(byte[].class).required(); + Arg ARG_BYTE_BUFFER = new Arg(ByteBuffer.class).required(); + DynamicArgs.Builder partialBinaryBuilder = new DynamicArgs.Builder(); + partialBinaryBuilder.addSignature(createCallArgs(SESSION, ARG_BYTE_ARRAY, ARG_PARTIAL_BOOL)); + partialBinaryBuilder.addSignature(createCallArgs(SESSION, ARG_BYTE_BUFFER, ARG_PARTIAL_BOOL)); + + DynamicArgs.Signature sigPartialBinary = partialBinaryBuilder.getMatchingSignature(onMsg); + if (sigPartialBinary != null) + { + // Found partial binary args + assertPartialMustHaveVoidReturn(onMsg); + final Object[] args = newCallArgs(sigPartialBinary.getCallArgs()); + args[0] = getSession(); + DynamicArgs invoker = partialBinaryBuilder.build(method, sigPartialBinary); + setOnBinary(new PartialBinaryMessageSink((partial) -> + { + args[1] = partial.getPayload(); + // TODO: handle byte[] version + args[2] = partial.isFin(); + invoker.invoke(endpoint, args); + return null; + }), onMsg); + } + + // Streaming TEXT --- + DynamicArgs.Builder streamingTextBuilder = new DynamicArgs.Builder(); + + decoders.supporting(Decoder.TextStream.class) + .forEach(registeredDecoder -> + streamingTextBuilder.addSignature( + createCallArgs( + SESSION, new Arg(registeredDecoder.objectType).required() + ) + ) + ); + + DynamicArgs.Signature sigStreamingText = streamingTextBuilder.getMatchingSignature(onMsg); + if (sigStreamingText != null) + { + // TODO: Found streaming text args + assertPartialMustHaveVoidReturn(onMsg); + final Object[] args = newCallArgs(sigStreamingText.getCallArgs()); + } + + // Streaming BINARY --- + DynamicArgs.Builder streamingBinaryBuilder = new DynamicArgs.Builder(); + + decoders.supporting(Decoder.BinaryStream.class) + .forEach(registeredDecoder -> + streamingBinaryBuilder.addSignature( + createCallArgs( + SESSION, new Arg(registeredDecoder.objectType).required() + ) + ) + ); + + DynamicArgs.Signature sigStreamingBinary = streamingBinaryBuilder.getMatchingSignature(onMsg); + if (sigStreamingBinary != null) + { + // TODO: found streaming binary args! + final Object[] args = newCallArgs(sigStreamingBinary.getCallArgs()); + DynamicArgs invoker = streamingBinaryBuilder.build(method, sigStreamingBinary); + /* + setOnBinary(new DecodedBinaryMessageSink( + (msg) -> + { + args[0] = getSession(); + args[1] = msg; + invoker.invoke(endpoint, args); + return null; + }), onMsg); + */ + } + // Test for PONG - + // TODO: super.setOnText() // TODO: super.setOnBinary() // TODO: super.setOnPong() @@ -328,20 +466,22 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions } } } - - private Encoder newEncoderFor(Class type) + + private Encoder getEncoderFor(Class type) { if ((type == Void.TYPE) || (type == Void.class)) { return null; } - + + // TODO: return a pre-initialized encoder from past call + Class encoderClass = encoders.getEncoderFor(type); if (encoderClass == null) { throw new InvalidWebSocketException("Unable to find Encoder for type " + type.getName()); } - + try { Encoder encoder = encoderClass.newInstance(); @@ -353,20 +493,55 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions throw new InvalidWebSocketException("Unable to initialize required Encoder: " + encoderClass.getName(), t); } } - + + private T getDecoderInstance(AvailableDecoders.RegisteredDecoder registeredDecoder, Class interfaceType) + { + // TODO: Return previous instantiated decoders here + + Class decoderClass = registeredDecoder.decoder; + try + { + Decoder decoder = decoderClass.newInstance(); + decoder.init(this.endpointConfig); + return (T) decoder; + } + catch (Throwable t) + { + throw new InvalidWebSocketException("Unable to initialize required Decoder: " + decoderClass.getName(), t); + } + } + private void assertSignatureValid(DynamicArgs.Signature sig, Class annotationClass, Method method) { if (sig != null) return; - + StringBuilder err = new StringBuilder(); err.append('@').append(annotationClass.getSimpleName()); err.append(' '); ReflectUtils.append(err, endpoint.getClass(), method); throw new InvalidSignatureException(err.toString()); } - - private Object[] newCallArgs(Arg[] callArgs) throws DecodeException + + private void assertPartialMustHaveVoidReturn(Method method) + { + if (method.getReturnType().isAssignableFrom(Void.TYPE)) + { + return; + } + + StringBuilder err = new StringBuilder(); + err.append("Partial @OnMessage handlers must be void return type: "); + ReflectUtils.append(err, endpoint.getClass(), method); + throw new InvalidSignatureException(err.toString()); + } + + public AvailableDecoders getAvailableDecoders() + { + return decoders; + } + + protected Object[] newCallArgs(Arg[] callArgs) throws DecodeException { int len = callArgs.length; Object[] args = new Object[callArgs.length]; @@ -380,7 +555,7 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions } return args; } - + private Object getDecodedStaticValue(String name, Class type) throws DecodeException { for (StaticArg args : staticArgs) @@ -390,24 +565,31 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions return AvailableDecoders.decodePrimitive(args.value, type); } } - + return null; } - + private DynamicArgs.Builder createDynamicArgs(Arg... args) { DynamicArgs.Builder argBuilder = new DynamicArgs.Builder(); + Arg[] callArgs = createCallArgs(args); + argBuilder.addSignature(callArgs); + return argBuilder; + } + + protected Arg[] createCallArgs(Arg... args) + { int argCount = args.length; if (this.staticArgs != null) argCount += this.staticArgs.size(); - + Arg callArgs[] = new Arg[argCount]; int idx = 0; for (Arg arg : args) { callArgs[idx++] = arg; } - + if (this.staticArgs != null) { for (StaticArg staticArg : this.staticArgs) @@ -418,9 +600,6 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions callArgs[idx++] = new Arg(staticArg.value.getClass()).setTag(staticArg.name); } } - - argBuilder.addSignature(callArgs); - - return argBuilder; + return callArgs; } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrOnTextFunction.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrOnTextFunction.java index 8cc95892dff..83d9d741866 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrOnTextFunction.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrOnTextFunction.java @@ -24,69 +24,56 @@ import java.util.function.Function; import javax.websocket.OnMessage; import javax.websocket.Session; +import org.eclipse.jetty.websocket.common.InvalidSignatureException; +import org.eclipse.jetty.websocket.common.reflect.Arg; +import org.eclipse.jetty.websocket.common.reflect.DynamicArgs; import org.eclipse.jetty.websocket.common.util.ReflectUtils; /** * javax.websocket {@link OnMessage} method {@link Function} for TEXT/{@link String} types */ -@Deprecated -public class JsrOnTextFunction implements Function +public class JsrOnTextFunction implements Function { -/* private static final DynamicArgs.Builder ARGBUILDER; - private static final int SESSION = 1; - private static final int TEXT = 2; - + private static final DynamicArgs.Builder ARGBUILDER; + private static final Arg SESSION = new Arg(Session.class); + private static final Arg TEXT = new Arg(String.class).required(); + static { ARGBUILDER = new DynamicArgs.Builder(); - ARGBUILDER.addSignature(new ExactSignature(String.class).indexedAs(TEXT)); - ARGBUILDER.addSignature(new ExactSignature(Session.class,String.class).indexedAs(SESSION,TEXT)); - } - - public static DynamicArgs.Builder getDynamicArgsBuilder() - { - return ARGBUILDER; + ARGBUILDER.addSignature(SESSION, TEXT); } public static boolean hasMatchingSignature(Method method) { return ARGBUILDER.hasMatchingSignature(method); - }*/ - + } + private final Session session; private final Object endpoint; private final Method method; - + private final DynamicArgs callable; + public JsrOnTextFunction(Session session, Object endpoint, Method method) { this.session = session; this.endpoint = endpoint; this.method = method; - - ReflectUtils.assertIsAnnotated(method,OnMessage.class); + + ReflectUtils.assertIsAnnotated(method, OnMessage.class); ReflectUtils.assertIsPublicNonStatic(method); - ReflectUtils.assertIsReturn(method,Void.TYPE); - - /*this.callable = ARGBUILDER.build(method); + ReflectUtils.assertIsReturn(method, Void.TYPE); + + this.callable = ARGBUILDER.build(method, SESSION, TEXT); if (this.callable == null) { - throw InvalidSignatureException.build(method,OnMessage.class,ARGBUILDER); + throw InvalidSignatureException.build(method, OnMessage.class, ARGBUILDER); } - this.callable.setArgReferences(SESSION,TEXT);*/ } - + @Override - public Void apply(String text) + public Object apply(T text) { - /*Object args[] = this.callable.toArgs(session,text); - try - { - method.invoke(endpoint,args); - } - catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) - { - throw new WebSocketException("Unable to call text message method " + ReflectUtils.toString(endpoint.getClass(),method),e); - }*/ - return null; + return this.callable.invoke(endpoint, session, text); } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrWholeTextEndpoint.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrWholeTextEndpoint.java new file mode 100644 index 00000000000..4730472972c --- /dev/null +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/function/JsrWholeTextEndpoint.java @@ -0,0 +1,55 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.jsr356.function; + +import java.lang.reflect.Method; + +import javax.websocket.Decoder; +import javax.websocket.Session; + +import org.eclipse.jetty.websocket.common.reflect.Arg; +import org.eclipse.jetty.websocket.common.reflect.DynamicArgs; + +/** + * Possible Text Endpoint Functions + */ +public class JsrWholeTextEndpoint +{ + private final Arg SESSION = new Arg(Session.class); + private DynamicArgs.Builder ARGBUILDER; + + public JsrWholeTextEndpoint(JsrEndpointFunctions jsrFunctions) + { + ARGBUILDER = new DynamicArgs.Builder(); + + jsrFunctions.getAvailableDecoders().supporting(Decoder.Text.class) + .forEach(registeredDecoder -> + ARGBUILDER.addSignature( + jsrFunctions.createCallArgs( + SESSION, new Arg(registeredDecoder.objectType).required() + ) + ) + ); + } + + public DynamicArgs.Signature getMatchingSignature(Method onMsg) + { + return ARGBUILDER.getMatchingSignature(onMsg); + } +} diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/messages/DecodedBinaryMessageSink.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/messages/DecodedBinaryMessageSink.java new file mode 100644 index 00000000000..d9f7a9225b1 --- /dev/null +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/messages/DecodedBinaryMessageSink.java @@ -0,0 +1,61 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.jsr356.messages; + +import java.io.IOException; +import java.util.function.Function; + +import javax.websocket.DecodeException; +import javax.websocket.Decoder; +import javax.websocket.EncodeException; + +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.common.message.ByteBufferMessageSink; +import org.eclipse.jetty.websocket.jsr356.JsrSession; + +public class DecodedBinaryMessageSink extends ByteBufferMessageSink +{ + public DecodedBinaryMessageSink(JsrSession session, Decoder.Binary decoder, Function onMessageFunction) + { + super(session.getPolicy(), (byteBuf) -> + { + try + { + Object decoded = null; + + decoded = decoder.decode(byteBuf); + + // notify event + Object ret = onMessageFunction.apply(decoded); + + if (ret != null) + { + // send response + session.getBasicRemote().sendObject(ret); + } + + return null; + } + catch (DecodeException | EncodeException | IOException e) + { + throw new WebSocketException(e); + } + }); + } +} diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/messages/DecodedTextMessageSink.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/messages/DecodedTextMessageSink.java new file mode 100644 index 00000000000..4474384e3b2 --- /dev/null +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/messages/DecodedTextMessageSink.java @@ -0,0 +1,59 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.jsr356.messages; + +import java.io.IOException; +import java.util.function.Function; + +import javax.websocket.DecodeException; +import javax.websocket.Decoder; +import javax.websocket.EncodeException; + +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.common.message.StringMessageSink; +import org.eclipse.jetty.websocket.jsr356.JsrSession; + +public class DecodedTextMessageSink extends StringMessageSink +{ + public DecodedTextMessageSink(JsrSession session, Decoder.Text decoder, Function onMessageFunction) + { + super(session.getPolicy(), (message) -> + { + try + { + Object decoded = decoder.decode(message); + + // notify event + Object ret = onMessageFunction.apply(decoded); + + if (ret != null) + { + // send response + session.getBasicRemote().sendObject(ret); + } + + return null; + } + catch (DecodeException | EncodeException | IOException e) + { + throw new WebSocketException(e); + } + }); + } +} diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions_OnMessage_BinaryTest.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions_OnMessage_BinaryTest.java index 17b48f09b44..28fe22a7755 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions_OnMessage_BinaryTest.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/function/JsrEndpointFunctions_OnMessage_BinaryTest.java @@ -18,13 +18,16 @@ package org.eclipse.jetty.websocket.jsr356.function; -import java.lang.reflect.InvocationTargetException; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpointConfig; import javax.websocket.EndpointConfig; import javax.websocket.OnMessage; @@ -46,20 +49,20 @@ import org.junit.Test; public class JsrEndpointFunctions_OnMessage_BinaryTest { private static ClientContainer container; - + @BeforeClass public static void initContainer() { container = new ClientContainer(); } - + private AvailableEncoders encoders = new AvailableEncoders(); private AvailableDecoders decoders = new AvailableDecoders(); private Map uriParams = new HashMap<>(); private EndpointConfig endpointConfig = new EmptyClientEndpointConfig(); - + private String expectedBuffer; - + public JsrSession newSession(Object websocket) { String id = JsrEndpointFunctions_OnMessage_BinaryTest.class.getSimpleName(); @@ -70,8 +73,8 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config); return new JsrSession(container, id, requestURI, ei, connection); } - - private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws InvocationTargetException, IllegalAccessException + + private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception { JsrEndpointFunctions endpointFunctions = new JsrEndpointFunctions( socket, container.getPolicy(), @@ -81,78 +84,85 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest uriParams, endpointConfig ); - + endpointFunctions.start(); + + assertThat("Has BinarySink", endpointFunctions.hasBinarySink(), is(true)); + // This invocation is the same for all tests ByteBuffer byteBuffer = ByteBuffer.wrap("Hello World".getBytes(StandardCharsets.UTF_8)); expectedBuffer = BufferUtil.toDetailString(byteBuffer); endpointFunctions.onBinary(byteBuffer, true); socket.assertEvent(String.format(expectedEventFormat, args)); } - + + @ClientEndpoint public static class MessageSocket extends TrackingSocket { - // TODO: Ambiguous declaration + // Invalid OnMessage - mandatory type (TEXT/BINARY) missing + @SuppressWarnings("IncorrectOnMessageMethodsInspection") @OnMessage public void onMessage() { addEvent("onMessage()"); } } - + @Test - public void testInvokeMessage() throws InvocationTargetException, IllegalAccessException + public void testInvokeMessage() throws Exception { assertOnMessageInvocation(new MessageSocket(), "onMessage()"); } - - public static class MessageTextSocket extends TrackingSocket + + @ClientEndpoint + public static class MessageByteBufferSocket extends TrackingSocket { @OnMessage - public void onMessage(String msg) + public void onMessage(ByteBuffer msg) { - addEvent("onMessage(%s)", msg); + addEvent("onMessage(%s)", BufferUtil.toUTF8String(msg)); } } - + @Test - public void testInvokeMessageText() throws InvocationTargetException, IllegalAccessException + public void testInvokeMessageByteBuffer() throws Exception { - assertOnMessageInvocation(new MessageTextSocket(), "onMessage(Hello World)"); + assertOnMessageInvocation(new MessageByteBufferSocket(), "onMessage(Hello World)"); } - + + @ClientEndpoint public static class MessageSessionSocket extends TrackingSocket { - // TODO: Ambiguous declaration + // Invalid OnMessage - mandatory type (TEXT/BINARY) missing @OnMessage public void onMessage(Session session) { addEvent("onMessage(%s)", session); } } - + @Test - public void testInvokeMessageSession() throws InvocationTargetException, IllegalAccessException + public void testInvokeMessageSession() throws Exception { assertOnMessageInvocation(new MessageSessionSocket(), "onMessage(JsrSession[CLIENT,%s,DummyConnection])", MessageSessionSocket.class.getName()); } - - public static class MessageSessionTextSocket extends TrackingSocket + + @ClientEndpoint + public static class MessageSessionByteBufferSocket extends TrackingSocket { @OnMessage - public void onMessage(Session session, String msg) + public void onMessage(Session session, ByteBuffer msg) { - - addEvent("onMessage(%s, %s)", session, msg); + addEvent("onMessage(%s, %s)", session, BufferUtil.toUTF8String(msg)); } } - + @Test - public void testInvokeMessageSessionText() throws InvocationTargetException, IllegalAccessException + public void testInvokeMessageSessionByteBuffer() throws Exception { - assertOnMessageInvocation(new MessageSessionTextSocket(), + assertOnMessageInvocation(new MessageSessionByteBufferSocket(), "onMessage(JsrSession[CLIENT,%s,DummyConnection], Hello World)", - MessageSessionTextSocket.class.getName()); + MessageSessionByteBufferSocket.class.getName()); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java index 68304d7b557..3bbb77516f9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java @@ -66,11 +66,11 @@ import org.eclipse.jetty.websocket.common.util.ReflectUtils; public class CommonEndpointFunctions extends AbstractLifeCycle implements EndpointFunctions { private static final Logger LOG = Log.getLogger(CommonEndpointFunctions.class); - + protected final Object endpoint; protected final WebSocketPolicy policy; protected final Executor executor; - + private T session; private Function onOpenFunction; private Function onCloseFunction; @@ -78,19 +78,19 @@ public class CommonEndpointFunctions extends AbstractLifeCycl private Function onFrameFunction; private Function onPingFunction; private Function onPongFunction; - + private MessageSink onTextSink; private MessageSink onBinarySink; - + private BatchMode batchMode; - + public CommonEndpointFunctions(Object endpoint, WebSocketPolicy policy, Executor executor) { Object e = endpoint; // unwrap endpoint while (e instanceof ManagedEndpoint) e = ((ManagedEndpoint) e).getRawEndpoint(); - + Objects.requireNonNull(endpoint, "Endpoint cannot be null"); Objects.requireNonNull(policy, "WebSocketPolicy cannot be null"); Objects.requireNonNull(executor, "Executor cannot be null"); @@ -98,65 +98,71 @@ public class CommonEndpointFunctions extends AbstractLifeCycl this.policy = policy; this.executor = executor; } - + @Override protected void doStart() throws Exception { super.doStart(); discoverEndpointFunctions(this.endpoint); } - + protected void discoverEndpointFunctions(Object endpoint) { boolean supportAnnotations = true; - + // Connection Listener if (endpoint instanceof WebSocketConnectionListener) { WebSocketConnectionListener listener = (WebSocketConnectionListener) endpoint; - setOnOpen((session) -> { + setOnOpen((session) -> + { listener.onWebSocketConnect(session); return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketConnect", Session.class) ); - setOnClose((close) -> { + setOnClose((close) -> + { listener.onWebSocketClose(close.getStatusCode(), close.getReason()); return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketClose", int.class, String.class) ); - setOnError((cause) -> { + setOnError((cause) -> + { listener.onWebSocketError(cause); return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketError", Throwable.class)); supportAnnotations = false; } - + // Simple Data Listener if (endpoint instanceof WebSocketListener) { WebSocketListener listener = (WebSocketListener) endpoint; - - setOnText(new StringMessageSink(policy, (payload) -> { + + setOnText(new StringMessageSink(policy, (payload) -> + { listener.onWebSocketText(payload); return null; }), ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketText", String.class)); - setOnBinary(new ByteArrayMessageSink(policy, (payload) -> { + setOnBinary(new ByteArrayMessageSink(policy, (payload) -> + { listener.onWebSocketBinary(payload, 0, payload.length); return null; }), ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketBinary", byte[].class, int.class, int.class)); supportAnnotations = false; } - + // Ping/Pong Listener if (endpoint instanceof WebSocketPingPongListener) { WebSocketPingPongListener listener = (WebSocketPingPongListener) endpoint; - setOnPong((pong) -> { + setOnPong((pong) -> + { ByteBuffer payload = pong; if (pong == null) payload = BufferUtil.EMPTY_BUFFER; @@ -164,7 +170,8 @@ public class CommonEndpointFunctions extends AbstractLifeCycl return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketPong", ByteBuffer.class)); - setOnPing((ping) -> { + setOnPing((ping) -> + { ByteBuffer payload = ping; if (ping == null) payload = BufferUtil.EMPTY_BUFFER; @@ -174,44 +181,47 @@ public class CommonEndpointFunctions extends AbstractLifeCycl ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketPing", ByteBuffer.class)); supportAnnotations = false; } - + // Partial Data / Message Listener if (endpoint instanceof WebSocketPartialListener) { WebSocketPartialListener listener = (WebSocketPartialListener) endpoint; - setOnText(new PartialTextMessageSink((partial) -> { + setOnText(new PartialTextMessageSink((partial) -> + { listener.onWebSocketPartialText(partial.getPayload(), partial.isFin()); return null; }), ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketPartialText", String.class, boolean.class)); - setOnBinary(new PartialBinaryMessageSink((partial) -> { + setOnBinary(new PartialBinaryMessageSink((partial) -> + { listener.onWebSocketPartialBinary(partial.getPayload(), partial.isFin()); return null; }), ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketPartialBinary", ByteBuffer.class, boolean.class)); supportAnnotations = false; } - + // Frame Listener if (endpoint instanceof WebSocketFrameListener) { WebSocketFrameListener listener = (WebSocketFrameListener) endpoint; - setOnFrame((frame) -> { + setOnFrame((frame) -> + { listener.onWebSocketFrame(new ReadOnlyDelegatedFrame(frame)); return null; }, ReflectUtils.findMethod(endpoint.getClass(), "onWebSocketFrame", Frame.class)); supportAnnotations = false; } - + if (supportAnnotations) discoverAnnotatedEndpointFunctions(endpoint); } - + protected void discoverAnnotatedEndpointFunctions(Object endpoint) { // Test for annotated websocket endpoint - + Class endpointClass = endpoint.getClass(); WebSocket websocket = endpointClass.getAnnotation(WebSocket.class); if (websocket != null) @@ -220,11 +230,11 @@ public class CommonEndpointFunctions extends AbstractLifeCycl policy.setMaxBinaryMessageSize(websocket.maxBinaryMessageSize()); policy.setMaxTextMessageSize(websocket.maxTextMessageSize()); policy.setIdleTimeout(websocket.maxIdleTime()); - + this.batchMode = websocket.batchMode(); - + Method onmethod = null; - + // OnWebSocketConnect [0..1] onmethod = ReflectUtils.findAnnotatedMethod(endpointClass, OnWebSocketConnect.class); if (onmethod != null) @@ -294,17 +304,17 @@ public class CommonEndpointFunctions extends AbstractLifeCycl } } } - + public BatchMode getBatchMode() { return batchMode; } - + public T getSession() { return session; } - + public void setOnOpen(Function function, Object origin) { assertNotSet(this.onOpenFunction, "Open Handler", origin); @@ -314,7 +324,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onOpen to " + describeOrigin(origin)); } } - + public void setOnClose(Function function, Object origin) { assertNotSet(this.onCloseFunction, "Close Handler", origin); @@ -324,7 +334,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onClose to " + describeOrigin(origin)); } } - + public void setOnError(Function function, Object origin) { assertNotSet(this.onErrorFunction, "Error Handler", origin); @@ -334,7 +344,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onError to " + describeOrigin(origin)); } } - + public void setOnText(MessageSink messageSink, Object origin) { assertNotSet(this.onTextSink, "TEXT Handler", origin); @@ -344,7 +354,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onText to " + describeOrigin(origin)); } } - + public void setOnBinary(MessageSink messageSink, Object origin) { assertNotSet(this.onBinarySink, "BINARY Handler", origin); @@ -354,7 +364,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onBinary to " + describeOrigin(origin)); } } - + public void setOnFrame(Function function, Object origin) { assertNotSet(this.onFrameFunction, "Frame Handler", origin); @@ -364,7 +374,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onFrame to " + describeOrigin(origin)); } } - + public void setOnPing(Function function, Object origin) { assertNotSet(this.onPingFunction, "Ping Handler", origin); @@ -374,7 +384,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onPing to " + describeOrigin(origin)); } } - + public void setOnPong(Function function, Object origin) { assertNotSet(this.onPongFunction, "Pong Handler", origin); @@ -384,111 +394,121 @@ public class CommonEndpointFunctions extends AbstractLifeCycl LOG.debug("Assigned onPong to " + describeOrigin(origin)); } } - + + public boolean hasBinarySink() + { + return this.onBinarySink != null; + } + + public boolean hasTextSink() + { + return this.onTextSink != null; + } + private String describeOrigin(Object obj) { if (obj == null) { return ""; } - + return obj.toString(); } - + protected void assertNotSet(Object val, String role, Object origin) { if (val == null) return; - + StringBuilder err = new StringBuilder(); err.append("Cannot replace previously assigned "); err.append(role); err.append(" with "); err.append(describeOrigin(origin)); - + throw new InvalidWebSocketException(err.toString()); } - + @Override public void onOpen(T session) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + this.session = session; - + if (onOpenFunction != null) onOpenFunction.apply(this.session); } - + @Override public void onClose(CloseInfo close) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onCloseFunction != null) onCloseFunction.apply(close); } - + @Override public void onFrame(Frame frame) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onFrameFunction != null) onFrameFunction.apply(frame); } - + @Override public void onError(Throwable cause) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onErrorFunction != null) onErrorFunction.apply(cause); else LOG.debug(cause); } - + @Override public void onText(ByteBuffer payload, boolean fin) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onTextSink != null) onTextSink.accept(payload, fin); } - + @Override public void onBinary(ByteBuffer payload, boolean fin) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onBinarySink != null) onBinarySink.accept(payload, fin); } - + @Override public void onPing(ByteBuffer payload) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onPingFunction != null) onPingFunction.apply(payload); } - + @Override public void onPong(ByteBuffer payload) { if (!isStarted()) throw new IllegalStateException(this.getClass().getName() + " not started"); - + if (onPongFunction != null) onPongFunction.apply(payload); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/OnTextFunction.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/OnTextFunction.java index fa83abd0969..7e7bce510a5 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/OnTextFunction.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/OnTextFunction.java @@ -68,7 +68,7 @@ public class OnTextFunction implements Function ReflectUtils.assertIsAnnotated(method, OnWebSocketMessage.class); ReflectUtils.assertIsPublicNonStatic(method); ReflectUtils.assertIsReturn(method, Void.TYPE); - + this.callable = ARGBUILDER.build(method, ARG_SESSION, ARG_TEXT); if (this.callable == null) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/ByteBufferMessageSink.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/ByteBufferMessageSink.java index 2203d7c293e..09faf262df8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/ByteBufferMessageSink.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/ByteBufferMessageSink.java @@ -18,58 +18,20 @@ package org.eclipse.jetty.websocket.common.message; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.function.Function; -import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -public class ByteBufferMessageSink implements MessageSink +public class ByteBufferMessageSink extends ByteArrayMessageSink { - private static final int BUFFER_SIZE = 65535; - private final WebSocketPolicy policy; - private final Function onMessageFunction; - private ByteArrayOutputStream out; - private int size; - public ByteBufferMessageSink(WebSocketPolicy policy, Function onMessageFunction) { - this.policy = policy; - this.onMessageFunction = onMessageFunction; - } - - @Override - public void accept(ByteBuffer payload, Boolean fin) - { - try + super(policy, (byteArray) -> { - if (payload != null) - { - policy.assertValidBinaryMessageSize(size + payload.remaining()); - size += payload.remaining(); - - if (out == null) - out = new ByteArrayOutputStream(BUFFER_SIZE); - - BufferUtil.writeTo(payload,out); - } - } - catch (IOException e) - { - throw new RuntimeException("Unable to append Binary Message", e); - } - finally - { - if (fin) - { - ByteBuffer bbuf = ByteBuffer.wrap(out.toByteArray()); - onMessageFunction.apply(bbuf); - // reset - out = null; - size = 0; - } - } + ByteBuffer bbuf = ByteBuffer.wrap(byteArray); + onMessageFunction.apply(bbuf); + return null; + }); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/StringMessageSink.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/StringMessageSink.java index 3d41783aaea..437a58affe9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/StringMessageSink.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/StringMessageSink.java @@ -43,6 +43,7 @@ public class StringMessageSink implements MessageSink this.size = 0; } + @SuppressWarnings("Duplicates") @Override public void accept(ByteBuffer payload, Boolean fin) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/DynamicArgs.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/DynamicArgs.java index e5b9d5be6bd..917f4ec5bfa 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/DynamicArgs.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/DynamicArgs.java @@ -166,7 +166,7 @@ public class DynamicArgs */ private final BiFunction invoker; - private DynamicArgs(BiFunction invoker) + public DynamicArgs(BiFunction invoker) { this.invoker = invoker; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/UnorderedSignature.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/UnorderedSignature.java index c742357576e..11c4b00c839 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/UnorderedSignature.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/reflect/UnorderedSignature.java @@ -29,7 +29,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.common.reflect.DynamicArgs.Signature; import org.eclipse.jetty.websocket.common.util.ReflectUtils; -class UnorderedSignature implements Signature, Predicate +public class UnorderedSignature implements Signature, Predicate { private class SelectedArg extends Arg { @@ -52,17 +52,17 @@ class UnorderedSignature implements Signature, Predicate } private final static Logger LOG = Log.getLogger(UnorderedSignature.class); - private final Arg[] params; + private final Arg[] callArgs; public UnorderedSignature(Arg... args) { - this.params = args; + this.callArgs = args; } @Override public Arg[] getCallArgs() { - return this.params; + return this.callArgs; } @Override @@ -74,14 +74,14 @@ class UnorderedSignature implements Signature, Predicate @Override public boolean test(Method method) { - return getArgMapping(method, false, params) != null; + return getArgMapping(method, false, callArgs) != null; } public void appendDescription(StringBuilder str) { str.append('('); boolean delim = false; - for (Arg arg : params) + for (Arg arg : callArgs) { if (delim) { @@ -229,6 +229,20 @@ class UnorderedSignature implements Signature, Predicate return new UnorderedParamsFunction(method, argMapping); } + /** + * Generate BiFunction for this signature. + * + * @param method the method to get the invoker function for + * @return BiFunction of Endpoint Object, Call Args, Return Type + */ + public BiFunction newFunction(Method method) + { + int argMapping[] = getArgMapping(method, true, this.callArgs); + + // Return function capable of calling method + return new UnorderedParamsFunction(method, argMapping); + } + public static class UnorderedParamsFunction implements BiFunction {