WebSocket - addressing bug in Partial message support

This commit is contained in:
Joakim Erdfelt 2013-08-20 15:31:12 -07:00
parent c548cba71c
commit 8c77fe2404
14 changed files with 491 additions and 51 deletions

View File

@ -40,6 +40,8 @@ import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
/**
* Base implementation for JSR-356 Annotated event drivers.
@ -78,25 +80,17 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E
if (events.hasBinary())
{
handled = true;
if (activeMessage == null)
{
if (events.isBinaryPartialSupported())
{
LOG.debug("Partial Binary Message: fin={}",fin);
// Partial Message Support (does not use messageAppender)
try
{
events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
}
catch (DecodeException e)
{
onFatalError(e);
}
return;
LOG.debug("Partial Binary Message: fin={}",fin);
activeMessage = new BinaryPartialOnMessage(this);
}
else
{
// Whole Message Support
if (activeMessage == null)
{
LOG.debug("Whole Binary Message");
activeMessage = new SimpleBinaryMessage(this);
}
@ -191,7 +185,6 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E
private void onFatalError(Throwable t)
{
onError(t);
// TODO: close connection?
}
@Override
@ -213,6 +206,30 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E
}
}
public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
{
try
{
events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
}
catch (DecodeException e)
{
onFatalError(e);
}
}
public void onPartialTextMessage(String message, boolean fin)
{
try
{
events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
}
catch (DecodeException e)
{
onFatalError(e);
}
}
@Override
public void onPong(ByteBuffer buffer)
{
@ -257,26 +274,17 @@ public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements E
if (events.hasText())
{
handled = true;
if (activeMessage == null)
{
if (events.isTextPartialSupported())
{
// Partial Message Support
LOG.debug("Partial Text Message: fin={}",fin);
// Partial Message Support (does not use messageAppender)
try
{
String text = BufferUtil.toUTF8String(buffer);
events.callText(jsrsession.getAsyncRemote(),websocket,text,fin);
}
catch (DecodeException e)
{
onFatalError(e);
}
return;
activeMessage = new TextPartialOnMessage(this);
}
else
{
// Whole Message Support
if (activeMessage == null)
{
LOG.debug("Whole Text Message");
activeMessage = new SimpleTextMessage(this);
}

View File

@ -28,6 +28,9 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.jsr356.MessageHandlerWrapper;
/**
* Partial BINARY MessageAppender for MessageHandler.Partial interface
*/
public class BinaryPartialMessage implements MessageAppender
{
private final MessageHandlerWrapper msgWrapper;

View File

@ -0,0 +1,66 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.messages;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrAnnotatedEventDriver;
/**
* Partial BINARY MessageAppender for @{@link OnMessage} annotated methods
*/
public class BinaryPartialOnMessage implements MessageAppender
{
private final JsrAnnotatedEventDriver driver;
private boolean finished;
public BinaryPartialOnMessage(JsrAnnotatedEventDriver driver)
{
this.driver = driver;
this.finished = false;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
if (finished)
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
driver.onPartialBinaryMessage(BufferUtil.EMPTY_BUFFER,isLast);
}
else
{
driver.onPartialBinaryMessage(payload,isLast);
}
}
@Override
public void messageComplete()
{
finished = true;
}
}

View File

@ -28,6 +28,9 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.jsr356.MessageHandlerWrapper;
/**
* Partial TEXT MessageAppender for MessageHandler.Partial interface
*/
public class TextPartialMessage implements MessageAppender
{
@SuppressWarnings("unused")

View File

@ -0,0 +1,67 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.messages;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrAnnotatedEventDriver;
/**
* Partial TEXT MessageAppender for @{@link OnMessage} annotated methods
*/
public class TextPartialOnMessage implements MessageAppender
{
private final JsrAnnotatedEventDriver driver;
private boolean finished;
public TextPartialOnMessage(JsrAnnotatedEventDriver driver)
{
this.driver = driver;
this.finished = false;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
if (finished)
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
driver.onPartialTextMessage("",isLast);
}
else
{
String text = BufferUtil.toUTF8String(payload);
driver.onPartialTextMessage(text,isLast);
}
}
@Override
public void messageComplete()
{
finished = true;
}
}

View File

@ -33,7 +33,7 @@ public class AnnotatedServerEndpointMetadata extends AnnotatedEndpointMetadata<S
private final ServerEndpoint endpoint;
private final AnnotatedServerEndpointConfig config;
protected AnnotatedServerEndpointMetadata(ServerContainer container, Class<?> websocket, ServerEndpointConfig baseConfig) throws DeploymentException
protected AnnotatedServerEndpointMetadata(Class<?> websocket, ServerEndpointConfig baseConfig) throws DeploymentException
{
super(websocket);

View File

@ -117,7 +117,7 @@ public class ServerContainer extends ClientContainer implements javax.websocket.
if (anno != null)
{
// Annotated takes precedence here
AnnotatedServerEndpointMetadata ametadata = new AnnotatedServerEndpointMetadata(this,endpoint,config);
AnnotatedServerEndpointMetadata ametadata = new AnnotatedServerEndpointMetadata(endpoint,config);
AnnotatedEndpointScanner<ServerEndpoint,ServerEndpointConfig> scanner = new AnnotatedEndpointScanner<>(ametadata);
metadata = ametadata;
scanner.scan();

View File

@ -0,0 +1,160 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState;
public class DummyConnection implements LogicalConnection
{
private static final Logger LOG = Log.getLogger(DummyConnection.class);
private IOState iostate;
public DummyConnection()
{
this.iostate = new IOState();
}
@Override
public void close()
{
}
@Override
public void close(int statusCode, String reason)
{
}
@Override
public void disconnect()
{
}
@Override
public ByteBufferPool getBufferPool()
{
return null;
}
@Override
public Executor getExecutor()
{
return null;
}
@Override
public long getIdleTimeout()
{
return 0;
}
@Override
public IOState getIOState()
{
return this.iostate;
}
@Override
public InetSocketAddress getLocalAddress()
{
return null;
}
@Override
public long getMaxIdleTimeout()
{
return 0;
}
@Override
public WebSocketPolicy getPolicy()
{
return null;
}
@Override
public InetSocketAddress getRemoteAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public WebSocketSession getSession()
{
return null;
}
@Override
public boolean isOpen()
{
return false;
}
@Override
public boolean isReading()
{
return false;
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{
callback.writeSuccess();
}
@Override
public void resume()
{
}
@Override
public void setMaxIdleTimeout(long ms)
{
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{
LOG.debug("setNextIncomingFrames({})",incoming);
}
@Override
public void setSession(WebSocketSession session)
{
}
@Override
public SuspendToken suspend()
{
return null;
}
}

View File

@ -186,14 +186,12 @@ public class EchoTest
.expect("Hello World|OhMy");
// Partial message based
/*
EchoCase.add(TESTCASES,PartialTextSocket.class)
.addSplitMessage("Saved"," by ","zero")
.expect("('Saved',false)(' by ',false)('zero',true)");
EchoCase.add(TESTCASES,PartialTextSessionSocket.class)
.addSplitMessage("Built"," for"," the"," future")
.expect("('Built',false)(' for',false)(' the',false)(' future',true)");
*/
}
@BeforeClass

View File

@ -0,0 +1,111 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.events.EventDriverImpl;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.annotations.AnnotatedEndpointScanner;
import org.eclipse.jetty.websocket.jsr356.endpoints.EndpointInstance;
import org.eclipse.jetty.websocket.jsr356.server.samples.partial.PartialTrackingSocket;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class OnPartialTest
{
@Rule
public TestName testname = new TestName();
public EventDriver toEventDriver(Object websocket) throws Throwable
{
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
policy.setInputBufferSize(1024);
policy.setMaxBinaryMessageBufferSize(1024);
policy.setMaxTextMessageBufferSize(1024);
// Event Driver Factory
EventDriverFactory factory = new EventDriverFactory(policy);
factory.addImplementation(new JsrServerEndpointImpl());
// Create EventDriver
EventDriverImpl driverImpl = new JsrServerEndpointImpl();
Class<?> endpoint = websocket.getClass();
ServerEndpoint anno = endpoint.getAnnotation(ServerEndpoint.class);
Assert.assertThat("Endpoint: " + endpoint + " should be annotated with @ServerEndpoint",anno,notNullValue());
ServerEndpointConfig config = new BasicServerEndpointConfig(endpoint,"/");
AnnotatedServerEndpointMetadata metadata = new AnnotatedServerEndpointMetadata(endpoint,config);
AnnotatedEndpointScanner<ServerEndpoint, ServerEndpointConfig> scanner = new AnnotatedEndpointScanner<>(metadata);
scanner.scan();
EndpointInstance ei = new EndpointInstance(websocket,config,metadata);
EventDriver driver = driverImpl.create(ei,policy);
Assert.assertThat("EventDriver",driver,notNullValue());
// Create Local JsrSession
String id = testname.getMethodName();
URI requestURI = URI.create("ws://localhost/" + id);
DummyConnection connection = new DummyConnection();
ClientContainer container = new ClientContainer();
@SuppressWarnings("resource")
JsrSession session = new JsrSession(requestURI,driver,connection,container,id);
session.setPolicy(policy);
session.open();
return driver;
}
@Test
public void testOnTextPartial() throws Throwable
{
List<WebSocketFrame> frames = new ArrayList<>();
frames.add(new TextFrame("Saved").setFin(false));
frames.add(new ContinuationFrame(" by ").setFin(false));
frames.add(new ContinuationFrame("zero").setFin(true));
PartialTrackingSocket socket = new PartialTrackingSocket();
EventDriver driver = toEventDriver(socket);
driver.onConnect();
for (WebSocketFrame frame : frames)
{
driver.incomingFrame(frame);
}
Assert.assertThat("Captured Event Queue size",socket.eventQueue.size(),is(3));
Assert.assertThat("Event[0]",socket.eventQueue.poll(),is("onPartial(\"Saved\",false)"));
Assert.assertThat("Event[1]",socket.eventQueue.poll(),is("onPartial(\" by \",false)"));
Assert.assertThat("Event[2]",socket.eventQueue.poll(),is("onPartial(\"zero\",true)"));
}
}

View File

@ -68,7 +68,6 @@ import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.ShortObjectT
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.ShortTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.streaming.ReaderParamSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.streaming.StringReturnReaderParamSocket;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -104,8 +103,6 @@ public class ServerAnnotatedEndpointScanner_GoodSignaturesTest
}
}
private static ServerContainer container = new ServerContainer(new DummyCreator(), new WebSocketServerFactory());
@Parameters
public static Collection<Case[]> data() throws Exception
{
@ -185,7 +182,7 @@ public class ServerAnnotatedEndpointScanner_GoodSignaturesTest
@Test
public void testScan_Basic() throws Exception
{
AnnotatedServerEndpointMetadata metadata = new AnnotatedServerEndpointMetadata(container,testcase.pojo,null);
AnnotatedServerEndpointMetadata metadata = new AnnotatedServerEndpointMetadata(testcase.pojo,null);
AnnotatedEndpointScanner<ServerEndpoint, ServerEndpointConfig> scanner = new AnnotatedEndpointScanner<>(metadata);
scanner.scan();

View File

@ -43,7 +43,6 @@ import org.eclipse.jetty.websocket.jsr356.server.samples.InvalidErrorIntSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.InvalidOpenCloseReasonSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.InvalidOpenIntSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.InvalidOpenSessionIntSocket;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -58,8 +57,6 @@ public class ServerAnnotatedEndpointScanner_InvalidSignaturesTest
{
private static final Logger LOG = Log.getLogger(ServerAnnotatedEndpointScanner_InvalidSignaturesTest.class);
private static ServerContainer container = new ServerContainer(new DummyCreator(), new WebSocketServerFactory());
@Parameters
public static Collection<Class<?>[]> data()
{
@ -97,7 +94,7 @@ public class ServerAnnotatedEndpointScanner_InvalidSignaturesTest
@Test
public void testScan_InvalidSignature() throws DeploymentException
{
AnnotatedServerEndpointMetadata metadata = new AnnotatedServerEndpointMetadata(container,pojo,null);
AnnotatedServerEndpointMetadata metadata = new AnnotatedServerEndpointMetadata(pojo,null);
AnnotatedEndpointScanner<ServerEndpoint,ServerEndpointConfig> scanner = new AnnotatedEndpointScanner<>(metadata);
try

View File

@ -0,0 +1,36 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server.samples.partial;
import java.io.IOException;
import javax.websocket.OnMessage;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.websocket.jsr356.server.TrackingSocket;
@ServerEndpoint("/echo/partial/tracking")
public class PartialTrackingSocket extends TrackingSocket
{
@OnMessage
public void onPartial(String msg, boolean fin) throws IOException
{
addEvent("onPartial(\"%s\",%b)",msg,fin);
}
}

View File

@ -50,7 +50,6 @@ import org.eclipse.jetty.websocket.client.io.ConnectionManager;
import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.client.masks.ZeroMasker;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriver;
@ -89,12 +88,7 @@ public class WebSocketClient extends ContainerLifeCycle
this.policy = WebSocketPolicy.newClientPolicy();
this.bufferPool = new MappedByteBufferPool();
this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
if(LOG.isDebugEnabled()) {
LOG.debug("Using ZeroMasker (DEBUG)");
this.masker = new ZeroMasker();
} else {
this.masker = new RandomMasker();
}
this.eventDriverFactory = new EventDriverFactory(policy);
this.sessionFactory = new WebSocketSessionFactory();
}