JSR-356 Fixing dispatching for Streaming reads

This commit is contained in:
Joakim Erdfelt 2013-07-30 12:49:04 -07:00
parent 37feaea34b
commit 702b675e86
12 changed files with 74 additions and 10 deletions

View File

@ -58,6 +58,7 @@ public class JsrSessionTest
ClientEndpointConfig config = new EmptyClientEndpointConfig();
DummyEndpoint websocket = new DummyEndpoint();
SimpleEndpointMetadata metadata = new SimpleEndpointMetadata(websocket.getClass());
// Executor executor = null;
EndpointInstance ei = new EndpointInstance(websocket,config,metadata);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.jsr356.samples;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.SuspendToken;
@ -60,6 +61,12 @@ public class DummyConnection implements LogicalConnection
return null;
}
@Override
public Executor getExecutor()
{
return null;
}
@Override
public long getIdleTimeout()
{

View File

@ -50,6 +50,9 @@ import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.LongObjectTe
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.LongTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.ShortObjectTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.primitives.ShortTextSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.streaming.ReaderParamSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.streaming.ReaderSocket;
import org.eclipse.jetty.websocket.jsr356.server.samples.streaming.StringReturnReaderParamSocket;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -174,13 +177,11 @@ public class EchoTest
.addMessage(1234).expect("1234|5678");
// Reader based
/* NOT WORKING YET
EchoCase.add(TESTCASES,ReaderSocket.class).addMessage("Hello World").expect("Hello World");
EchoCase.add(TESTCASES,ReaderParamSocket.class).requestPath("/echo/streaming/readerparam/OhNo")
.addMessage("Hello World").expect("Hello World|OnNo");
.addMessage("Hello World").expect("Hello World|OhNo");
EchoCase.add(TESTCASES,StringReturnReaderParamSocket.class).requestPath("/echo/streaming/readerparam2/OhMy")
.addMessage("Hello World").expect("Hello World|OhMy");
*/
}
@BeforeClass

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.common;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -63,6 +64,12 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* @return
*/
ByteBufferPool getBufferPool();
/**
* Get the Executor used by this connection.
* @return
*/
Executor getExecutor();
/**
* Get the read/write idle timeout.

View File

@ -24,6 +24,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.MultiMap;
@ -59,6 +60,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private final URI requestURI;
private final EventDriver websocket;
private final LogicalConnection connection;
private final Executor executor;
private ExtensionFactory extensionFactory;
private String protocolVersion;
private Map<String, String[]> parameterMap = new HashMap<>();
@ -79,6 +81,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
this.requestURI = requestURI;
this.websocket = websocket;
this.connection = connection;
this.executor = connection.getExecutor();
this.outgoingHandler = connection;
this.incomingHandler = websocket;
@ -134,8 +137,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
public void dispatch(Runnable runnable)
{
// TODO Auto-generated method stub
runnable.run();
executor.execute(runnable);
}
@Override

View File

@ -34,7 +34,7 @@ public class WebSocketSessionFactory implements SessionFactory
{
return (websocket instanceof JettyAnnotatedEventDriver) || (websocket instanceof JettyListenerEventDriver);
}
@Override
public WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
{

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
@ -79,12 +80,13 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
if (events.onBinary.isStreaming())
{
activeMessage = new MessageInputStream(session.getConnection());
final MessageAppender msg = activeMessage;
dispatch(new Runnable()
{
@Override
public void run()
{
events.onBinary.call(websocket,session,activeMessage);
events.onBinary.call(websocket,session,msg);
}
});
}
@ -180,12 +182,13 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
if (events.onText.isStreaming())
{
activeMessage = new MessageReader(new MessageInputStream(session.getConnection()));
final MessageAppender msg = activeMessage;
dispatch(new Runnable()
{
@Override
public void run()
{
events.onText.call(websocket,session,activeMessage);
events.onText.call(websocket,session,msg);
}
});
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.common.extensions.mux;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@ -71,6 +72,13 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
this.inputClosed = new AtomicBoolean(false);
this.outputClosed = new AtomicBoolean(false);
}
@Override
public Executor getExecutor()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void close()

View File

@ -216,7 +216,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
this.setInputBufferSize(policy.getInputBufferSize());
}
@Override
public Executor getExecutor()
{
return super.getExecutor();
}
@Override
public void close()
{

View File

@ -25,6 +25,9 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.LogicalConnection;
/**
@ -34,6 +37,7 @@ import org.eclipse.jetty.websocket.common.LogicalConnection;
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
private static final Logger LOG = Log.getLogger(MessageInputStream.class);
/**
* Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/
@ -53,6 +57,11 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("appendMessage(ByteBuffer,{}): {}",isLast,BufferUtil.toDetailString(payload));
}
if (buffersExhausted.get())
{
// This indicates a programming mistake/error and must be bug fixed
@ -102,6 +111,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override
public void messageComplete()
{
LOG.debug("messageComplete()");
buffersExhausted.set(true);
// toss an empty ByteBuffer into queue to let it drain
try
@ -117,6 +128,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override
public int read() throws IOException
{
LOG.debug("read()");
try
{
if (closed.get())
@ -143,7 +156,10 @@ public class MessageInputStream extends InputStream implements MessageAppender
}
catch (InterruptedException e)
{
throw new IOException(e);
LOG.warn(e);
closed.set(true);
return -1;
// throw new IOException(e);
}
}

View File

@ -35,6 +35,9 @@ public class AnnotatedBinaryStreamSocket
@OnWebSocketMessage
public void onBinary(InputStream stream)
{
if(stream == null) {
new RuntimeException("Stream cannot be null").printStackTrace(System.err);
}
capture.add("onBinary(%s)",stream);
}

View File

@ -19,11 +19,13 @@
package org.eclipse.jetty.websocket.common.io;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -42,6 +44,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
private static final Logger LOG = Log.getLogger(LocalWebSocketConnection.class);
private final String id;
private final ByteBufferPool bufferPool;
private final Executor executor;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
private IncomingFrames incoming;
private IOState ioState = new IOState();
@ -55,6 +58,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
{
this.id = id;
this.bufferPool = new MappedByteBufferPool();
this.executor = new ExecutorThreadPool();
this.ioState.addListener(this);
}
@ -62,6 +66,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
{
this(testname.getMethodName());
}
@Override
public Executor getExecutor()
{
return executor;
}
@Override
public void close()