JSR-356 fixing calling of @OnMessage with partial message boolean

This commit is contained in:
Joakim Erdfelt 2013-07-30 14:27:48 -07:00
parent 702b675e86
commit f8172e5a59
7 changed files with 255 additions and 34 deletions

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.websocket.common.util.ReflectUtils;
import org.eclipse.jetty.websocket.jsr356.EncoderFactory;
import org.eclipse.jetty.websocket.jsr356.InitException;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.annotations.Param.Role;
public class OnMessageCallable extends JsrCallable
{
@ -129,6 +130,8 @@ public class OnMessageCallable extends JsrCallable
public void init(JsrSession session)
{
super.init(session);
idxPartialMessageFlag = findIndexForRole(Role.MESSAGE_PARTIAL_FLAG);
EncoderFactory.Wrapper encoderWrapper = session.getEncoderFactory().getWrapperFor(returnType);
if (encoderWrapper != null)
{

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.jsr356.server;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -25,14 +26,28 @@ import javax.websocket.server.ServerEndpoint;
public class EchoCase
{
public static EchoCase add(List<EchoCase[]> data, Class<?> serverPojo, String path)
public static class PartialBinary
{
EchoCase ecase = new EchoCase();
ecase.serverPojo = serverPojo;
ecase.path = path;
data.add(new EchoCase[]
{ ecase });
return ecase;
ByteBuffer part;
boolean fin;
public PartialBinary(ByteBuffer part, boolean fin)
{
this.part = part;
this.fin = fin;
}
}
public static class PartialText
{
String part;
boolean fin;
public PartialText(String part, boolean fin)
{
this.part = part;
this.fin = fin;
}
}
public static EchoCase add(List<EchoCase[]> data, Class<?> serverPojo)
@ -46,6 +61,16 @@ public class EchoCase
return ecase;
}
public static EchoCase add(List<EchoCase[]> data, Class<?> serverPojo, String path)
{
EchoCase ecase = new EchoCase();
ecase.serverPojo = serverPojo;
ecase.path = path;
data.add(new EchoCase[]
{ ecase });
return ecase;
}
// The websocket server pojo to test against
public Class<?> serverPojo;
// The (relative) URL path to hit
@ -61,6 +86,26 @@ public class EchoCase
return this;
}
public EchoCase addSplitMessage(ByteBuffer... parts)
{
int len = parts.length;
for (int i = 0; i < len; i++)
{
addMessage(new PartialBinary(parts[i],(i == (len-1))));
}
return this;
}
public EchoCase addSplitMessage(String... parts)
{
int len = parts.length;
for (int i = 0; i < len; i++)
{
addMessage(new PartialText(parts[i],(i == (len-1))));
}
return this;
}
public EchoCase expect(String message)
{
expectedStrings.add(message);
@ -83,13 +128,18 @@ public class EchoCase
str.append(",messages[").append(messages.size());
str.append("]=");
boolean delim = false;
for(Object msg: messages) {
if(delim) {
for (Object msg : messages)
{
if (delim)
{
str.append(",");
}
if(msg instanceof String) {
if (msg instanceof String)
{
str.append("'").append(msg).append("'");
} else {
}
else
{
str.append("(").append(msg.getClass().getName()).append(")");
str.append(msg);
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
@ -34,11 +35,9 @@ public class EchoClientSocket extends TrackingSocket
{
private Session session;
@OnOpen
public void onOpen(Session session)
public void close() throws IOException
{
this.session = session;
openLatch.countDown();
this.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"Test Complete"));
}
@OnClose
@ -49,25 +48,37 @@ public class EchoClientSocket extends TrackingSocket
super.closeLatch.countDown();
}
public void sendObject(Object obj) throws IOException, EncodeException
{
session.getBasicRemote().sendObject(obj);
}
@OnError
public void onError(Throwable t)
{
addError(t);
}
@OnOpen
public void onOpen(Session session)
{
this.session = session;
openLatch.countDown();
}
@OnMessage
public void onText(String text)
{
addEvent(text);
}
public void close() throws IOException
public void sendObject(Object obj) throws IOException, EncodeException
{
this.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"Test Complete"));
session.getBasicRemote().sendObject(obj);
}
public void sendPartialBinary(ByteBuffer part, boolean fin) throws IOException
{
session.getBasicRemote().sendBinary(part,fin);
}
public void sendPartialText(String part, boolean fin) throws IOException
{
session.getBasicRemote().sendText(part,fin);
}
}

View File

@ -33,6 +33,10 @@ import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.jsr356.server.EchoCase.PartialBinary;
import org.eclipse.jetty.websocket.jsr356.server.EchoCase.PartialText;
import org.eclipse.jetty.websocket.jsr356.server.samples.partial.PartialTextSessionSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.partial.PartialTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.BooleanObjectTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.BooleanTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.ByteObjectTextSocket;
@ -147,7 +151,7 @@ public class EchoTest
EchoCase.add(TESTCASES,FloatObjectTextSocket.class).addMessage("42").expect("42.0000");
EchoCase.add(TESTCASES,FloatObjectTextSocket.class).addMessage(".123").expect("0.1230");
EchoCase.add(TESTCASES,FloatObjectTextSocket.class).addMessage("50505E-6").expect("0.0505");
EchoCase.add(TESTCASES,IntTextSocket.class).addMessage((int)8).expect("8");
EchoCase.add(TESTCASES,IntTextSocket.class).addMessage((int)22).expect("22");
EchoCase.add(TESTCASES,IntTextSocket.class).addMessage("12345678").expect("12345678");
@ -171,17 +175,23 @@ public class EchoTest
EchoCase.add(TESTCASES,ShortObjectTextSocket.class).addMessage((int)4).expect("4");
EchoCase.add(TESTCASES,ShortObjectTextSocket.class).addMessage((int)987).expect("987");
EchoCase.add(TESTCASES,ShortObjectTextSocket.class).addMessage(-32001L).expect("-32001");
// PathParam based
EchoCase.add(TESTCASES,IntParamTextSocket.class).requestPath("/echo/primitives/integer/params/5678")
.addMessage(1234).expect("1234|5678");
EchoCase.add(TESTCASES,IntParamTextSocket.class).requestPath("/echo/primitives/integer/params/5678").addMessage(1234).expect("1234|5678");
// Reader based
EchoCase.add(TESTCASES,ReaderSocket.class).addMessage("Hello World").expect("Hello World");
EchoCase.add(TESTCASES,ReaderParamSocket.class).requestPath("/echo/streaming/readerparam/OhNo")
.addMessage("Hello World").expect("Hello World|OhNo");
EchoCase.add(TESTCASES,StringReturnReaderParamSocket.class).requestPath("/echo/streaming/readerparam2/OhMy")
.addMessage("Hello World").expect("Hello World|OhMy");
EchoCase.add(TESTCASES,ReaderParamSocket.class).requestPath("/echo/streaming/readerparam/OhNo").addMessage("Hello World").expect("Hello World|OhNo");
EchoCase.add(TESTCASES,StringReturnReaderParamSocket.class).requestPath("/echo/streaming/readerparam2/OhMy").addMessage("Hello World")
.expect("Hello World|OhMy");
// Partial message based
EchoCase.add(TESTCASES,PartialTextSocket.class)
.addSplitMessage("Saved"," by ","zero")
.expect("('Saved',false)(' by ',false)('zero',true)");
EchoCase.add(TESTCASES,PartialTextSessionSocket.class)
.addSplitMessage("Built"," for"," the"," future")
.expect("('Built',false)(' for',false)(' the',false)(' future',true)");
}
@BeforeClass
@ -249,8 +259,29 @@ public class EchoTest
int messageCount = 0;
for (Object msg : testcase.messages)
{
socket.sendObject(msg);
messageCount++;
if (msg instanceof PartialText)
{
PartialText pt = (PartialText)msg;
socket.sendPartialText(pt.part,pt.fin);
if (pt.fin)
{
messageCount++;
}
}
else if (msg instanceof PartialBinary)
{
PartialBinary pb = (PartialBinary)msg;
socket.sendPartialBinary(pb.part,pb.fin);
if (pb.fin)
{
messageCount++;
}
}
else
{
socket.sendObject(msg);
messageCount++;
}
}
// Collect Responses

View File

@ -0,0 +1,55 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server.samples.partial;
import java.io.IOException;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.jsr356.server.StackUtil;
@ServerEndpoint("/echo/partial/textsession")
public class PartialTextSessionSocket
{
private static final Logger LOG = Log.getLogger(PartialTextSessionSocket.class);
private StringBuilder buf = new StringBuilder();
@OnMessage
public void onPartial(String msg, boolean fin, Session session) throws IOException
{
buf.append("('").append(msg).append("',").append(fin).append(')');
if (fin)
{
session.getBasicRemote().sendText(buf.toString());
buf.setLength(0);
}
}
@OnError
public void onError(Throwable cause, Session session) throws IOException
{
LOG.warn("Error",cause);
session.getBasicRemote().sendText("Exception: " + StackUtil.toString(cause));
}
}

View File

@ -0,0 +1,63 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server.samples.partial;
import java.io.IOException;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.jsr356.server.StackUtil;
@ServerEndpoint("/echo/partial/text")
public class PartialTextSocket
{
private static final Logger LOG = Log.getLogger(PartialTextSocket.class);
private Session session;
private StringBuilder buf = new StringBuilder();
@OnOpen
public void onOpen(Session session)
{
this.session = session;
}
@OnMessage
public void onPartial(String msg, boolean fin) throws IOException
{
buf.append("('").append(msg).append("',").append(fin).append(')');
if (fin)
{
session.getBasicRemote().sendText(buf.toString());
buf.setLength(0);
}
}
@OnError
public void onError(Throwable cause) throws IOException
{
LOG.warn("Error",cause);
session.getBasicRemote().sendText("Exception: " + StackUtil.toString(cause));
}
}

View File

@ -51,6 +51,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
public final OutgoingFrames outgoing;
private final ReentrantLock msgLock = new ReentrantLock();
private final AtomicInteger msgType = new AtomicInteger(NONE);
private boolean partialStarted = false;
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
@ -172,7 +173,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast);
if(partialStarted) {
frame.setContinuation(true);
}
blockingWrite(frame);
partialStarted = !isLast;
}
finally
{
@ -207,8 +212,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
LOG.debug("sendPartialString({}, {})",fragment,isLast);
}
WebSocketFrame frame = WebSocketFrame.text(fragment).setFin(isLast);
if(partialStarted) {
frame.setContinuation(true);
}
blockingWrite(frame);
partialStarted = !isLast;
}
finally
{