Issue #3428 - changes to AbstractDecodedMessageSink signature from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-06-29 20:34:18 +10:00
parent acf47624a3
commit 4fdf52bd55
5 changed files with 37 additions and 45 deletions

View File

@ -21,11 +21,13 @@ package org.eclipse.jetty.websocket.javax.common.messages;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.websocket.CloseReason;
import javax.websocket.Decoder; import javax.websocket.Decoder;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.javax.common.decoders.RegisteredDecoder; import org.eclipse.jetty.websocket.javax.common.decoders.RegisteredDecoder;
import org.eclipse.jetty.websocket.util.messages.MessageSink; import org.eclipse.jetty.websocket.util.messages.MessageSink;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -35,18 +37,16 @@ public abstract class AbstractDecodedMessageSink implements MessageSink
{ {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDecodedMessageSink.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractDecodedMessageSink.class);
private final CoreSession _coreSession;
private final MethodHandle _methodHandle; private final MethodHandle _methodHandle;
private final MessageSink _messageSink; private final MessageSink _messageSink;
public AbstractDecodedMessageSink(CoreSession coreSession, MethodHandle methodHandle) public AbstractDecodedMessageSink(CoreSession coreSession, MethodHandle methodHandle)
{ {
_coreSession = coreSession;
_methodHandle = methodHandle; _methodHandle = methodHandle;
try try
{ {
_messageSink = getMessageSink(); _messageSink = newMessageSink(coreSession);
} }
catch (Exception e) catch (Exception e)
{ {
@ -55,21 +55,27 @@ public abstract class AbstractDecodedMessageSink implements MessageSink
} }
} }
public CoreSession getCoreSession() /**
* Invoke the MessageSink with the decoded message.
* @param args the decoded message.
*/
public void invoke(Object... args)
{ {
return _coreSession; try
} {
_methodHandle.invoke(args);
public MethodHandle getMethodHandle() }
{ catch (Throwable t)
return _methodHandle; {
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
} }
/** /**
* @return a message sink which will first decode the message then pass it to {@link #_methodHandle}. * @return a message sink which will first decode the message then pass it to {@link #_methodHandle}.
* @throws Exception for any error in creating the message sink. * @throws Exception for any error in creating the message sink.
*/ */
abstract MessageSink getMessageSink() throws Exception; abstract MessageSink newMessageSink(CoreSession coreSession) throws Exception;
@Override @Override
public void accept(Frame frame, Callback callback) public void accept(Frame frame, Callback callback)

View File

@ -45,12 +45,12 @@ public class DecodedBinaryMessageSink<T> extends AbstractDecodedMessageSink.Basi
} }
@Override @Override
MessageSink getMessageSink() throws Exception MessageSink newMessageSink(CoreSession coreSession) throws Exception
{ {
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup() MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
.findVirtual(DecodedBinaryMessageSink.class, "onWholeMessage", MethodType.methodType(void.class, ByteBuffer.class)) .findVirtual(DecodedBinaryMessageSink.class, "onWholeMessage", MethodType.methodType(void.class, ByteBuffer.class))
.bindTo(this); .bindTo(this);
return new ByteBufferMessageSink(getCoreSession(), methodHandle); return new ByteBufferMessageSink(coreSession, methodHandle);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -63,17 +63,13 @@ public class DecodedBinaryMessageSink<T> extends AbstractDecodedMessageSink.Basi
try try
{ {
T obj = decoder.decode(wholeMessage); T obj = decoder.decode(wholeMessage);
getMethodHandle().invoke(obj); invoke(obj);
return; return;
} }
catch (DecodeException e) catch (DecodeException e)
{ {
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e); throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
} }
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
} }
} }

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.javax.common.messages; package org.eclipse.jetty.websocket.javax.common.messages;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
@ -41,12 +42,12 @@ public class DecodedBinaryStreamMessageSink<T> extends AbstractDecodedMessageSin
} }
@Override @Override
MessageSink getMessageSink() throws Exception MessageSink newMessageSink(CoreSession coreSession) throws Exception
{ {
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup().findVirtual(DecodedBinaryStreamMessageSink.class, MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
"onStreamStart", MethodType.methodType(void.class, InputStream.class)) .findVirtual(DecodedBinaryStreamMessageSink.class, "onStreamStart", MethodType.methodType(void.class, InputStream.class))
.bindTo(this); .bindTo(this);
return new InputStreamMessageSink(getCoreSession(), methodHandle); return new InputStreamMessageSink(coreSession, methodHandle);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -55,15 +56,11 @@ public class DecodedBinaryStreamMessageSink<T> extends AbstractDecodedMessageSin
try try
{ {
T obj = _decoder.decode(stream); T obj = _decoder.decode(stream);
getMethodHandle().invoke(obj); invoke(obj);
} }
catch (DecodeException e) catch (DecodeException | IOException e)
{ {
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e); throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
} }
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
} }
} }

View File

@ -44,12 +44,12 @@ public class DecodedTextMessageSink<T> extends AbstractDecodedMessageSink.Basic<
} }
@Override @Override
MessageSink getMessageSink() throws NoSuchMethodException, IllegalAccessException MessageSink newMessageSink(CoreSession coreSession) throws NoSuchMethodException, IllegalAccessException
{ {
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup() MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
.findVirtual(getClass(), "onMessage", MethodType.methodType(void.class, String.class)) .findVirtual(getClass(), "onMessage", MethodType.methodType(void.class, String.class))
.bindTo(this); .bindTo(this);
return new StringMessageSink(getCoreSession(), methodHandle); return new StringMessageSink(coreSession, methodHandle);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -62,17 +62,13 @@ public class DecodedTextMessageSink<T> extends AbstractDecodedMessageSink.Basic<
try try
{ {
T obj = decoder.decode(wholeMessage); T obj = decoder.decode(wholeMessage);
getMethodHandle().invoke(obj); invoke(obj);
return; return;
} }
catch (DecodeException e) catch (DecodeException e)
{ {
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e); throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
} }
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
} }
} }

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.javax.common.messages; package org.eclipse.jetty.websocket.javax.common.messages;
import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
@ -41,12 +42,12 @@ public class DecodedTextStreamMessageSink<T> extends AbstractDecodedMessageSink.
} }
@Override @Override
MessageSink getMessageSink() throws Exception MessageSink newMessageSink(CoreSession coreSession) throws Exception
{ {
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup().findVirtual(DecodedTextStreamMessageSink.class, MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
"onStreamStart", MethodType.methodType(void.class, Reader.class)) .findVirtual(DecodedTextStreamMessageSink.class, "onStreamStart", MethodType.methodType(void.class, Reader.class))
.bindTo(this); .bindTo(this);
return new ReaderMessageSink(getCoreSession(), methodHandle); return new ReaderMessageSink(coreSession, methodHandle);
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -55,15 +56,11 @@ public class DecodedTextStreamMessageSink<T> extends AbstractDecodedMessageSink.
try try
{ {
T obj = _decoder.decode(reader); T obj = _decoder.decode(reader);
getMethodHandle().invoke(obj); invoke(obj);
} }
catch (DecodeException e) catch (DecodeException | IOException e)
{ {
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e); throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
} }
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
} }
} }