First working TEXT echo thru the muxer (as test case)

This commit is contained in:
Joakim Erdfelt 2012-10-31 11:07:05 -07:00
parent 62f8e13397
commit 01c9e650fb
13 changed files with 162 additions and 38 deletions

View File

@ -254,6 +254,8 @@ public class MuxChannel implements WebSocketConnection, IncomingFrames, Outgoing
public void setSession(WebSocketSession session)
{
this.session = session;
this.incoming = session;
session.setOutgoing(this);
}
public void setSubProtocol(String subProtocol)

View File

@ -154,6 +154,7 @@ public class MuxGenerator
public <C> void output(C context, Callback<C> callback, long channelId, WebSocketFrame frame) throws IOException
{
ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
BufferUtil.flipToFill(muxPayload);
// start building mux payload
writeChannelId(muxPayload,channelId);
@ -167,6 +168,7 @@ public class MuxGenerator
// build muxed frame
WebSocketFrame muxFrame = WebSocketFrame.binary();
BufferUtil.flipToFlush(muxPayload,0);
muxFrame.setPayload(muxPayload);
// NOTE: the physical connection will handle masking rules for this frame.

View File

@ -21,8 +21,10 @@ package org.eclipse.jetty.websocket.core.extensions.mux;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
@ -71,6 +73,10 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
private MuxGenerator generator;
private MuxAddServer addServer;
private MuxAddClient addClient;
/** The original request headers, used for delta encoded AddChannelRequest blocks */
private List<String> physicalRequestHeaders;
/** The original response headers, used for delta encoded AddChannelResponse blocks */
private List<String> physicalResponseHeaders;
public Muxer(final WebSocketConnection connection, final OutgoingFrames outgoing)
{
@ -135,6 +141,12 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
return physicalConnection.isOpen();
}
public String mergeHeaders(List<String> physicalHeaders, String deltaHeaders)
{
// TODO Auto-generated method stub
return null;
}
/**
* Per spec, the physical connection must be failed.
* <p>
@ -194,17 +206,27 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
// submit to upgrade handshake process
try
{
MuxAddChannelResponse response = addServer.handshake(physicalConnection,request);
if (response == null)
String requestHandshake = BufferUtil.toUTF8String(request.getHandshake());
if (request.isDeltaEncoded())
{
LOG.warn("AddChannelResponse is null");
// no upgrade possible?
response = new MuxAddChannelResponse();
response.setChannelId(request.getChannelId());
response.setFailed(true);
// Merge original request headers out of physical connection.
requestHandshake = mergeHeaders(physicalRequestHeaders,requestHandshake);
}
String responseHandshake = addServer.handshake(channel,requestHandshake);
if (StringUtil.isNotBlank(responseHandshake))
{
// Upgrade Success
MuxAddChannelResponse response = new MuxAddChannelResponse();
response.setChannelId(request.getChannelId());
response.setFailed(false);
response.setHandshake(responseHandshake);
// send response
this.generator.generate(response);
}
else
{
// TODO: trigger error?
}
// send response
this.generator.generate(response);
}
catch (Throwable t)
{
@ -369,6 +391,10 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
*/
public <C> void output(C context, Callback<C> callback, long channelId, WebSocketFrame frame) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("output({}, {}, {}, {})",context,callback,channelId,frame);
}
generator.output(context,callback,channelId,frame);
}
@ -394,4 +420,10 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
{
this.remoteAddress = remoteAddress;
}
@Override
public String toString()
{
return String.format("Muxer[subChannels.size=%d]", channels.size());
}
}

View File

@ -20,14 +20,27 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add;
import java.io.IOException;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
/**
* Server interface, for dealing with incoming AddChannelRequest and posting of AddChannelResponse back.
* Server interface, for dealing with incoming AddChannelRequest / AddChannelResponse flows.
*/
public interface MuxAddServer
{
MuxAddChannelResponse handshake(WebSocketConnection physicalConnection, MuxAddChannelRequest request) throws IOException;
/**
* Perform the handshake.
*
* @param channel
* the channel to attach the {@link WebSocketSession} to.
* @param requestHandshake
* the request handshake (request headers)
* @return the response handshake (the response headers)
* @throws MuxException
* if unable to handshake
* @throws IOException
* if unable to parse request headers
*/
String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException;
}

View File

@ -65,6 +65,16 @@ public class MuxAddChannelRequest implements MuxControlBlock
return rsv;
}
public boolean isDeltaEncoded()
{
return (enc == 1);
}
public boolean isIdentityEncoded()
{
return (enc == 0);
}
public void setChannelId(long channelId)
{
this.channelId = channelId;

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.core.extensions.mux.op;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
@ -98,6 +99,11 @@ public class MuxAddChannelResponse implements MuxControlBlock
}
}
public void setHandshake(String responseHandshake)
{
setHandshake(BufferUtil.toBuffer(responseHandshake));
}
public void setRsv(byte rsv)
{
this.rsv = rsv;

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.core.examples.echo;
import java.io.IOException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.WebSocketAdapter;
/**
@ -27,16 +29,20 @@ import org.eclipse.jetty.websocket.core.api.WebSocketAdapter;
*/
public class AdapterEchoSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(AdapterEchoSocket.class);
@Override
public void onWebSocketText(String message)
{
if (isNotConnected())
{
LOG.debug("WebSocket Not Connected");
return;
}
try
{
LOG.debug("Echoing back message [{}]",message);
// echo the data back
getBlockingConnection().write(message);
}

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxDropChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxFlowControl;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxNewChannelSlot;
import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.junit.Assert;
public class MuxEventCapture implements MuxParser.Listener
@ -44,6 +45,24 @@ public class MuxEventCapture implements MuxParser.Listener
Assert.assertThat("Frame Count",frames.size(), is(expected));
}
public void assertHasFrame(byte opcode, long channelId, int expectedCount)
{
int actualCount = 0;
for (MuxedFrame frame : frames)
{
if (frame.getChannelId() == channelId)
{
if (frame.getOpCode() == opcode)
{
actualCount++;
}
}
}
Assert.assertThat("Expected Count of " + OpCode.name(opcode) + " frames on Channel ID " + channelId,actualCount,is(expectedCount));
}
public void assertHasOp(byte opCode, int expectedCount)
{
int actualCount = 0;

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.websocket.core.extensions.mux;
import java.io.IOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
@ -32,6 +34,7 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
*/
public class MuxInjector implements OutgoingFrames
{
private static final Logger LOG = Log.getLogger(MuxInjector.class);
private IncomingFrames incoming;
private MuxGenerator generator;
@ -42,7 +45,7 @@ public class MuxInjector implements OutgoingFrames
this.generator.setOutgoing(this);
}
public void op(long channelId, WebSocketFrame frame) throws IOException
public void frame(long channelId, WebSocketFrame frame) throws IOException
{
this.generator.generate(channelId,frame);
}
@ -55,6 +58,7 @@ public class MuxInjector implements OutgoingFrames
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
LOG.debug("Injecting {} to {}",frame,incoming);
this.incoming.incoming(frame);
}
}

View File

@ -43,5 +43,6 @@ public class MuxReducer extends MuxEventCapture implements OutgoingFrames
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
parser.parse(frame);
callback.completed(context); // let blocked calls know the send is complete.
}
}

View File

@ -19,42 +19,52 @@
package org.eclipse.jetty.websocket.core.extensions.mux.add;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.examples.echo.AdapterEchoSocket;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory;
/**
* Dummy impl of MuxAddServer
*/
public class DummyMuxAddServer implements MuxAddServer
{
@SuppressWarnings("unused")
private static final Logger LOG = Log.getLogger(DummyMuxAddServer.class);
private AdapterEchoSocket echo;
private WebSocketPolicy policy;
private EventDriverFactory eventDriverFactory;
public DummyMuxAddServer()
{
this.policy = WebSocketPolicy.newServerPolicy();
this.eventDriverFactory = new EventDriverFactory(policy);
this.echo = new AdapterEchoSocket();
}
@Override
public MuxAddChannelResponse handshake(WebSocketConnection physicalConnection, MuxAddChannelRequest request) throws IOException
public String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException
{
MuxAddChannelResponse response = new MuxAddChannelResponse();
response.setChannelId(request.getChannelId());
response.setEnc((byte)0);
StringBuilder hresp = new StringBuilder();
hresp.append("HTTP/1.1 101 Switching Protocols\r\n");
hresp.append("Connection: upgrade\r\n");
StringBuilder response = new StringBuilder();
response.append("HTTP/1.1 101 Switching Protocols\r\n");
response.append("Connection: upgrade\r\n");
// not meaningful (per Draft 08) hresp.append("Upgrade: websocket\r\n");
// not meaningful (per Draft 08) hresp.append("Sec-WebSocket-Accept: Kgo85/8KVE8YPONSeyhgL3GwqhI=\r\n");
hresp.append("\r\n");
response.append("\r\n");
ByteBuffer handshake = BufferUtil.toBuffer(hresp.toString());
LOG.debug("Handshake: {}",BufferUtil.toDetailString(handshake));
EventDriver websocket = this.eventDriverFactory.wrap(echo);
WebSocketSession session = new WebSocketSession(websocket,channel,channel.getPolicy(),"echo");
channel.setSession(session);
channel.setSubProtocol("echo");
channel.onOpen();
session.onConnect();
response.setHandshake(handshake);
return response;
return response.toString();
}
}

View File

@ -29,6 +29,8 @@ import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.LocalWebSocketConnection;
import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -44,16 +46,17 @@ public class MuxerAddServerTest
{
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newServerPolicy());
physical.onOpen();
MuxReducer reducer = new MuxReducer();
// Represents a server side muxer.
Muxer muxer = new Muxer(physical,reducer);
DummyMuxAddServer addServer = new DummyMuxAddServer();
muxer.setAddServer(addServer);
MuxInjector inject = new MuxInjector(muxer);
// Trigger AddChannel
StringBuilder request = new StringBuilder();
request.append("GET /echo HTTP/1.1\r\n");
@ -71,11 +74,20 @@ public class MuxerAddServerTest
inject.op(req);
// Make sure we got AddChannelResponse
reducer.assertHasOp(MuxOp.ADD_CHANNEL_RESPONSE,1);
MuxAddChannelResponse response = (MuxAddChannelResponse)reducer.getOps().pop();
Assert.assertThat("AddChannelResponse.channelId",response.getChannelId(),is(1L));
Assert.assertThat("AddChannelResponse.failed",response.isFailed(),is(false));
Assert.assertThat("AddChannelResponse.handshake",response.getHandshake(),notNullValue());
Assert.assertThat("AddChannelResponse.handshakeSize",response.getHandshakeSize(),is(57L));
reducer.reset();
// Send simple echo request
inject.frame(1,WebSocketFrame.text("Hello World"));
// Test for echo response (is there a user echo websocket connected to the sub-channel?)
reducer.assertHasFrame(OpCode.TEXT,1L,1);
}
}

View File

@ -33,6 +33,7 @@ public class LocalWebSocketConnection implements WebSocketConnection
{
private final String id;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
private boolean open = false;
public LocalWebSocketConnection()
{
@ -52,16 +53,19 @@ public class LocalWebSocketConnection implements WebSocketConnection
@Override
public void close()
{
open = false;
}
@Override
public void close(int statusCode, String reason)
{
open = false;
}
@Override
public void disconnect()
{
open = false;
}
@Override
@ -98,10 +102,9 @@ public class LocalWebSocketConnection implements WebSocketConnection
@Override
public boolean isOpen()
{
return false;
return open;
}
@Override
public boolean isOutputClosed()
{
@ -121,6 +124,10 @@ public class LocalWebSocketConnection implements WebSocketConnection
// TODO Auto-generated method stub
}
public void onOpen() {
open = true;
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{