Wiring up extension chain on server side

This commit is contained in:
Joakim Erdfelt 2012-07-17 10:09:25 -07:00
parent c84df9bf2b
commit 249595882c
7 changed files with 128 additions and 60 deletions

View File

@ -36,8 +36,8 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames; import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.MessageInputStream; import org.eclipse.jetty.websocket.io.MessageInputStream;
import org.eclipse.jetty.websocket.io.MessageReader; import org.eclipse.jetty.websocket.io.MessageReader;
import org.eclipse.jetty.websocket.io.RawConnection;
import org.eclipse.jetty.websocket.io.StreamAppender; import org.eclipse.jetty.websocket.io.StreamAppender;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.protocol.CloseInfo; import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.Frame; import org.eclipse.jetty.websocket.protocol.Frame;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
@ -58,7 +58,7 @@ public class WebSocketEventDriver implements IncomingFrames
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final EventMethods events; private final EventMethods events;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private RawConnection connection; private WebSocketSession connection;
private ByteBuffer activeMessage; private ByteBuffer activeMessage;
private StreamAppender activeStream; private StreamAppender activeStream;
@ -347,7 +347,7 @@ public class WebSocketEventDriver implements IncomingFrames
* @param conn * @param conn
* the connection * the connection
*/ */
public void setConnection(RawConnection conn) public void setConnection(WebSocketSession conn)
{ {
this.connection = conn; this.connection = conn;
} }

View File

@ -9,17 +9,29 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class WebSocketSession implements WebSocketConnection public class WebSocketSession implements WebSocketConnection, IncomingFrames, OutgoingFrames
{ {
private static final Logger LOG = Log.getLogger(WebSocketSession.class); private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private RawConnection connection; private final RawConnection connection;
private final WebSocketPolicy policy;
private final String subprotocol;
private final WebSocketEventDriver websocket;
private OutgoingFrames outgoing; private OutgoingFrames outgoing;
private String subprotocol;
private WebSocketPolicy policy; public WebSocketSession(WebSocketEventDriver websocket, RawConnection connection, WebSocketPolicy policy, String subprotocol)
{
super();
this.websocket = websocket;
this.connection = connection;
this.policy = policy;
this.subprotocol = subprotocol;
}
@Override @Override
public void close() throws IOException public void close() throws IOException
@ -33,6 +45,16 @@ public class WebSocketSession implements WebSocketConnection
connection.close(statusCode,reason); connection.close(statusCode,reason);
} }
public IncomingFrames getIncoming()
{
return websocket;
}
public OutgoingFrames getOutgoing()
{
return outgoing;
}
@Override @Override
public WebSocketPolicy getPolicy() public WebSocketPolicy getPolicy()
{ {
@ -51,12 +73,33 @@ public class WebSocketSession implements WebSocketConnection
return subprotocol; return subprotocol;
} }
@Override
public void incoming(WebSocketException e)
{
// pass on incoming to websocket itself
websocket.incoming(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
// pass on incoming to websocket itself
websocket.incoming(frame);
}
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return connection.isOpen(); return connection.isOpen();
} }
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
// forward on to chain
outgoing.output(context,callback,frame);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -68,6 +111,11 @@ public class WebSocketSession implements WebSocketConnection
outgoing.output(context,callback,frame); outgoing.output(context,callback,frame);
} }
public void setOutgoing(OutgoingFrames outgoing)
{
this.outgoing = outgoing;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@ -397,7 +397,7 @@ public class WebSocketFrame implements Frame
} }
data = ByteBuffer.allocate(len); data = ByteBuffer.allocate(len);
BufferUtil.clear(data); BufferUtil.clearToFill(data);
data.put(buf,0,len); data.put(buf,0,len);
BufferUtil.flipToFlush(data,0); BufferUtil.flipToFlush(data,0);
return this; return this;

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.websocket.examples.AnnotatedBinaryArraySocket;
import org.eclipse.jetty.websocket.examples.AnnotatedBinaryStreamSocket; import org.eclipse.jetty.websocket.examples.AnnotatedBinaryStreamSocket;
import org.eclipse.jetty.websocket.examples.AnnotatedFramesSocket; import org.eclipse.jetty.websocket.examples.AnnotatedFramesSocket;
import org.eclipse.jetty.websocket.examples.ListenerBasicSocket; import org.eclipse.jetty.websocket.examples.ListenerBasicSocket;
import org.eclipse.jetty.websocket.io.LocalWebSocketConnection; import org.eclipse.jetty.websocket.io.LocalWebSocketSession;
import org.eclipse.jetty.websocket.protocol.CloseInfo; import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -57,7 +57,7 @@ public class WebSocketEventDriverTest
AdapterConnectCloseSocket socket = new AdapterConnectCloseSocket(); AdapterConnectCloseSocket socket = new AdapterConnectCloseSocket();
WebSocketEventDriver driver = newDriver(socket); WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn); driver.setConnection(conn);
driver.onConnect(); driver.onConnect();
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame()); driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
@ -73,7 +73,7 @@ public class WebSocketEventDriverTest
AnnotatedBinaryArraySocket socket = new AnnotatedBinaryArraySocket(); AnnotatedBinaryArraySocket socket = new AnnotatedBinaryArraySocket();
WebSocketEventDriver driver = newDriver(socket); WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn); driver.setConnection(conn);
driver.onConnect(); driver.onConnect();
driver.incoming(makeBinaryFrame("Hello World",true)); driver.incoming(makeBinaryFrame("Hello World",true));
@ -91,7 +91,7 @@ public class WebSocketEventDriverTest
AnnotatedFramesSocket socket = new AnnotatedFramesSocket(); AnnotatedFramesSocket socket = new AnnotatedFramesSocket();
WebSocketEventDriver driver = newDriver(socket); WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn); driver.setConnection(conn);
driver.onConnect(); driver.onConnect();
driver.incoming(new WebSocketFrame(OpCode.PING).setPayload("PING")); driver.incoming(new WebSocketFrame(OpCode.PING).setPayload("PING"));
@ -114,7 +114,7 @@ public class WebSocketEventDriverTest
AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket(); AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket();
WebSocketEventDriver driver = newDriver(socket); WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn); driver.setConnection(conn);
driver.onConnect(); driver.onConnect();
driver.incoming(makeBinaryFrame("Hello World",true)); driver.incoming(makeBinaryFrame("Hello World",true));
@ -132,7 +132,7 @@ public class WebSocketEventDriverTest
ListenerBasicSocket socket = new ListenerBasicSocket(); ListenerBasicSocket socket = new ListenerBasicSocket();
WebSocketEventDriver driver = newDriver(socket); WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); LocalWebSocketSession conn = new LocalWebSocketSession(testname);
driver.setConnection(conn); driver.setConnection(conn);
driver.onConnect(); driver.onConnect();
driver.incoming(WebSocketFrame.text("Hello World")); driver.incoming(WebSocketFrame.text("Hello World"));

View File

@ -15,17 +15,13 @@
//======================================================================== //========================================================================
package org.eclipse.jetty.websocket.io; package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.rules.TestName; import org.junit.rules.TestName;
public class LocalWebSocketConnection implements RawConnection, WebSocketConnection public class LocalWebSocketConnection implements RawConnection
{ {
private final String id; private final String id;
@ -59,24 +55,12 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect
{ {
} }
@Override
public WebSocketPolicy getPolicy()
{
return null;
}
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {
return null; return null;
} }
@Override
public String getSubProtocol()
{
return null;
}
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
@ -88,29 +72,9 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect
{ {
} }
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("LocalWebSocketConnection[%s]",id); return String.format("%s[%s]",LocalWebSocketConnection.class.getSimpleName(),id);
}
@Override
public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
{
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
{
}
@Override
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
} }
} }

View File

@ -0,0 +1,27 @@
package org.eclipse.jetty.websocket.io;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.junit.rules.TestName;
public class LocalWebSocketSession extends WebSocketSession
{
private String id;
public LocalWebSocketSession(TestName testname)
{
this(testname,null);
}
public LocalWebSocketSession(TestName testname, WebSocketEventDriver driver)
{
super(driver,new LocalWebSocketConnection(testname),WebSocketPolicy.newServerPolicy(),"testing");
this.id = testname.getMethodName();
}
@Override
public String toString()
{
return String.format("%s[%s]",LocalWebSocketSession.class.getSimpleName(),id);
}
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
@ -48,7 +49,9 @@ import org.eclipse.jetty.websocket.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver; import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry; import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.io.IncomingFrames; import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection; import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig; import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76; import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455; import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
@ -285,12 +288,6 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
this.creator = creator; this.creator = creator;
} }
private IncomingFrames setupExtensionChain(WebSocketEventDriver websocket, List<Extension> extensions)
{
// TODO Auto-generated method stub
return websocket;
}
/** /**
* Upgrade the request/response to a WebSocket Connection. * Upgrade the request/response to a WebSocket Connection.
* <p> * <p>
@ -348,8 +345,40 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
LOG.debug("AsyncWebSocketConnection: {}",connection); LOG.debug("AsyncWebSocketConnection: {}",connection);
// Initialize / Negotiate Extensions // Initialize / Negotiate Extensions
WebSocketSession session = new WebSocketSession(websocket,connection,getPolicy(),response.getAcceptedSubProtocol());
List<Extension> extensions = initExtensions(request.getExtensions()); List<Extension> extensions = initExtensions(request.getExtensions());
IncomingFrames incoming = setupExtensionChain(websocket,extensions);
// Start with default routing.
IncomingFrames incoming = session;
OutgoingFrames outgoing = connection;
// Connect extensions
if (extensions != null)
{
Iterator<Extension> extIter;
// Connect outgoings
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;
}
// Connect incomings
Collections.reverse(extensions);
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextIncomingFrames(incoming);
incoming = ext;
}
}
// configure session for outgoing flows
session.setOutgoing(outgoing);
// configure connection for incoming flows
connection.getParser().setIncomingFramesHandler(incoming); connection.getParser().setIncomingFramesHandler(incoming);
// Process (version specific) handshake response // Process (version specific) handshake response
@ -361,7 +390,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
// Notify POJO of connection // Notify POJO of connection
// TODO move to WebSocketAsyncConnection.onOpen // TODO move to WebSocketAsyncConnection.onOpen
websocket.setConnection(connection); websocket.setConnection(session);
websocket.onConnect(); websocket.onConnect();
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection); LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);