Issue #4772 - support partial messages for Jetty WS API annotations (#6357)

* Issue #4772 - support partial messages for Jetty WS API annotations

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2021-06-11 00:05:54 +10:00 committed by GitHub
parent d997a1171b
commit 6dea0251c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 328 additions and 44 deletions

View File

@ -103,6 +103,7 @@ public interface RemoteEndpoint
* @param fragment the text being sent * @param fragment the text being sent
* @param isLast true if this is the last piece of the partial bytes * @param isLast true if this is the last piece of the partial bytes
* @param callback callback to notify of success or failure of the write operation * @param callback callback to notify of success or failure of the write operation
* @throws IOException this never throws IOException, it was a mistake to have this in the signature.
*/ */
void sendPartialString(String fragment, boolean isLast, WriteCallback callback) throws IOException; void sendPartialString(String fragment, boolean isLast, WriteCallback callback) throws IOException;

View File

@ -21,6 +21,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
/** /**
* Annotation for tagging methods to receive Binary or Text Message events. * Annotation for tagging methods to receive Binary or Text Message events.
@ -30,23 +31,31 @@ import org.eclipse.jetty.websocket.api.Session;
* <p> * <p>
* <u>Text Message Versions</u> * <u>Text Message Versions</u>
* <ol> * <ol>
* <li><code>public void methodName(String text)</code></li> * <li>{@code public void methodName(String text)}</li>
* <li><code>public void methodName({@link Session} session, String text)</code></li> * <li>{@code public void methodName(Session session, String text)}</li>
* <li><code>public void methodName(Reader reader)</code></li> * <li>{@code public void methodName(Reader reader)}</li>
* <li><code>public void methodName({@link Session} session, Reader reader)</code></li> * <li>{@code public void methodName(Session session, Reader reader)}</li>
* </ol> * </ol>
* Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to * <p>Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to
* use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques. * use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques.</p>
* <p>
* <u>Binary Message Versions</u> * <u>Binary Message Versions</u>
* <ol> * <ol>
* <li><code>public void methodName(ByteBuffer message)</code></li> * <li>{@code public void methodName(ByteBuffer message)}</li>
* <li><code>public void methodName({@link Session} session, ByteBuffer message)</code></li> * <li>{@code public void methodName(Session session, ByteBuffer message)}</li>
* <li><code>public void methodName(byte buf[], int offset, int length)</code></li> * <li>{@code public void methodName(byte[] buf, int offset, int length)}</li>
* <li><code>public void methodName({@link Session} session, byte buf[], int offset, int length)</code></li> * <li>{@code public void methodName(Session session, byte[] buf, int offset, int length)}</li>
* <li><code>public void methodName(InputStream stream)</code></li> * <li>{@code public void methodName(InputStream stream)}</li>
* <li><code>public void methodName({@link Session} session, InputStream stream)</code></li> * <li>{@code public void methodName(Session session, InputStream stream)}</li>
* </ol> * </ol>
* <u>Partial Message Variations</u>
* <p>These are used to receive partial messages without aggregating them into a complete WebSocket message. Instead the a boolean
* argument is supplied to indicate whether this is the last segment of data of the message. See {@link WebSocketPartialListener}
* interface for more details on partial messages.</p>
* <ol>
* <li>{@code public void methodName(ByteBuffer payload, boolean last)}</li>
* <li>{@code public void methodName(String payload, boolean last)}</li>
* </ol>
* <p>Note: Similar to the signatures above these can all be used with an optional first {@link Session} parameter.</p>
*/ */
@Documented @Documented
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)

View File

@ -78,6 +78,51 @@ import org.eclipse.jetty.websocket.core.internal.util.ReflectUtils;
*/ */
public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
{ {
private static final InvokerUtils.Arg[] textCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required()
};
private static final InvokerUtils.Arg[] binaryBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required()
};
private static final InvokerUtils.Arg[] binaryArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(int.class), // offset
new InvokerUtils.Arg(int.class) // length
};
private static final InvokerUtils.Arg[] inputStreamCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(InputStream.class).required()
};
private static final InvokerUtils.Arg[] readerCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(Reader.class).required()
};
private static final InvokerUtils.Arg[] textPartialCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required(),
new InvokerUtils.Arg(boolean.class).required()
};
private static final InvokerUtils.Arg[] binaryPartialBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required(),
new InvokerUtils.Arg(boolean.class).required()
};
private static final InvokerUtils.Arg[] binaryPartialArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(boolean.class).required()
};
private final WebSocketContainer container; private final WebSocketContainer container;
private final WebSocketComponents components; private final WebSocketComponents components;
private final Map<Class<?>, JettyWebSocketFrameHandlerMetadata> metadataMap = new ConcurrentHashMap<>(); private final Map<Class<?>, JettyWebSocketFrameHandlerMetadata> metadataMap = new ConcurrentHashMap<>();
@ -333,34 +378,6 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
if (onMessages != null && onMessages.length > 0) if (onMessages != null && onMessages.length > 0)
{ {
// The different kind of @OnWebSocketMessage method parameter signatures expected // The different kind of @OnWebSocketMessage method parameter signatures expected
InvokerUtils.Arg[] textCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required()
};
InvokerUtils.Arg[] binaryBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required()
};
InvokerUtils.Arg[] binaryArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(int.class), // offset
new InvokerUtils.Arg(int.class) // length
};
InvokerUtils.Arg[] inputStreamCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(InputStream.class).required()
};
InvokerUtils.Arg[] readerCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(Reader.class).required()
};
for (Method onMsg : onMessages) for (Method onMsg : onMessages)
{ {
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class); assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
@ -409,11 +426,27 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
metadata.setTextHandler(ReaderMessageSink.class, methodHandle, onMsg); metadata.setTextHandler(ReaderMessageSink.class, methodHandle, onMsg);
continue; continue;
} }
else
methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, textPartialCallingArgs);
if (methodHandle != null)
{ {
// Not a valid @OnWebSocketMessage declaration signature // Partial Text Message
throw InvalidSignatureException.build(endpointClass, OnWebSocketMessage.class, onMsg); assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setTextHandler(PartialStringMessageSink.class, methodHandle, onMsg);
continue;
} }
methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, binaryPartialBufferCallingArgs);
if (methodHandle != null)
{
// Partial ByteBuffer Message
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setBinaryHandle(PartialByteBufferMessageSink.class, methodHandle, onMsg);
continue;
}
// Not a valid @OnWebSocketMessage declaration signature
throw InvalidSignatureException.build(endpointClass, OnWebSocketMessage.class, onMsg);
} }
} }

View File

@ -156,6 +156,7 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS); b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
} }
// FIXME: Remove the throws IOException from API for this method in the next major release.
@Override @Override
public void sendPartialString(String fragment, boolean isLast, WriteCallback callback) public void sendPartialString(String fragment, boolean isLast, WriteCallback callback)
{ {

View File

@ -0,0 +1,240 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.exceptions.InvalidWebSocketException;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AnnotatedPartialListenerTest
{
public static class PartialEchoSocket implements WebSocketPartialListener
{
private Session session;
@Override
public void onWebSocketConnect(Session session)
{
this.session = session;
}
@Override
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
{
session.getRemote().sendPartialBytes(payload, fin, WriteCallback.NOOP);
}
@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
try
{
session.getRemote().sendPartialString(payload, fin, WriteCallback.NOOP);
}
catch (IOException e)
{
throw new IllegalStateException(e);
}
}
}
@WebSocket
public static class PartialStringListener
{
public BlockingQueue<MessageSegment> messages = new LinkedBlockingQueue<>();
public static class MessageSegment
{
public String message;
public boolean last;
}
@OnWebSocketMessage
public void onMessage(String message, boolean last)
{
MessageSegment messageSegment = new MessageSegment();
messageSegment.message = message;
messageSegment.last = last;
messages.add(messageSegment);
}
}
@WebSocket
public static class PartialByteBufferListener
{
public BlockingQueue<MessageSegment> messages = new LinkedBlockingQueue<>();
public static class MessageSegment
{
public ByteBuffer buffer;
public boolean last;
}
@OnWebSocketMessage
public void onMessage(ByteBuffer buffer, boolean last)
{
MessageSegment messageSegment = new MessageSegment();
messageSegment.buffer = BufferUtil.copy(buffer);
messageSegment.last = last;
messages.add(messageSegment);
}
}
@WebSocket
public static class InvalidDoubleBinaryListener
{
@OnWebSocketMessage
public void onMessage(ByteBuffer bytes, boolean last)
{
}
@OnWebSocketMessage
public void onMessage(ByteBuffer bytes)
{
}
}
@WebSocket
public static class InvalidDoubleTextListener
{
@OnWebSocketMessage
public void onMessage(String content, boolean last)
{
}
@OnWebSocketMessage
public void onMessage(String content)
{
}
}
private Server server;
private URI serverUri;
private WebSocketClient client;
@BeforeEach
public void before() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
JettyWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, container) ->
{
container.setAutoFragment(false);
container.addMapping("/", PartialEchoSocket.class);
}));
server.start();
serverUri = URI.create("ws://localhost:" + connector.getLocalPort() + "/");
client = new WebSocketClient();
client.setAutoFragment(false);
client.start();
}
@AfterEach
public void after() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testAnnotatedPartialString() throws Exception
{
PartialStringListener endpoint = new PartialStringListener();
try (Session session = client.connect(endpoint, serverUri).get(5, TimeUnit.SECONDS))
{
session.getRemote().sendPartialString("hell", false);
session.getRemote().sendPartialString("o w", false);
session.getRemote().sendPartialString("orld", true);
}
PartialStringListener.MessageSegment segment;
segment = Objects.requireNonNull(endpoint.messages.poll(5, TimeUnit.SECONDS));
assertThat(segment.message, is("hell"));
assertThat(segment.last, is(false));
segment = Objects.requireNonNull(endpoint.messages.poll(5, TimeUnit.SECONDS));
assertThat(segment.message, is("o w"));
assertThat(segment.last, is(false));
segment = Objects.requireNonNull(endpoint.messages.poll(5, TimeUnit.SECONDS));
assertThat(segment.message, is("orld"));
assertThat(segment.last, is(true));
}
@Test
public void testAnnotatedPartialByteBuffer() throws Exception
{
PartialByteBufferListener endpoint = new PartialByteBufferListener();
try (Session session = client.connect(endpoint, serverUri).get(5, TimeUnit.SECONDS))
{
session.getRemote().sendPartialBytes(BufferUtil.toBuffer("hell"), false);
session.getRemote().sendPartialBytes(BufferUtil.toBuffer("o w"), false);
session.getRemote().sendPartialBytes(BufferUtil.toBuffer("orld"), true);
}
PartialByteBufferListener.MessageSegment segment;
segment = Objects.requireNonNull(endpoint.messages.poll(5, TimeUnit.SECONDS));
assertThat(segment.buffer, is(BufferUtil.toBuffer("hell")));
assertThat(segment.last, is(false));
segment = Objects.requireNonNull(endpoint.messages.poll(5, TimeUnit.SECONDS));
assertThat(segment.buffer, is(BufferUtil.toBuffer("o w")));
assertThat(segment.last, is(false));
segment = Objects.requireNonNull(endpoint.messages.poll(5, TimeUnit.SECONDS));
assertThat(segment.buffer, is(BufferUtil.toBuffer("orld")));
assertThat(segment.last, is(true));
}
@Test
public void testDoubleOnMessageAnnotation()
{
InvalidDoubleBinaryListener doubleBinaryListener = new InvalidDoubleBinaryListener();
assertThrows(InvalidWebSocketException.class, () -> client.connect(doubleBinaryListener, serverUri));
InvalidDoubleTextListener doubleTextListener = new InvalidDoubleTextListener();
assertThrows(InvalidWebSocketException.class, () -> client.connect(doubleTextListener, serverUri));
}
}