Attempting to get Mux sub-channel to flow thru server as well

This commit is contained in:
Joakim Erdfelt 2012-11-02 11:20:36 -07:00
parent 88ed9ff710
commit e310b0a0ec
35 changed files with 1300 additions and 190 deletions

View File

@ -221,6 +221,13 @@ public class ClientUpgradeRequest implements UpgradeRequest
return headers.get(name);
}
@Override
public Map<String, List<String>> getHeaders()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHost()
{
@ -228,9 +235,10 @@ public class ClientUpgradeRequest implements UpgradeRequest
}
@Override
public String getHttpEndPointName()
public String getHttpVersion()
{
return httpEndPointName;
// TODO Auto-generated method stub
return null;
}
public String getKey()
@ -238,12 +246,39 @@ public class ClientUpgradeRequest implements UpgradeRequest
return key;
}
@Override
public String getMethod()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getOrigin()
{
return getHeader("Origin");
}
@Override
public Map<String, String[]> getParameterMap()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getQueryString()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getRemoteURI()
{
return httpEndPointName;
}
@Override
public List<String> getSubProtocols()
{

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.client.internal.io;
import java.util.List;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.EndPoint;
@ -25,7 +26,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
@ -47,6 +50,12 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
this.masker = client.getMasker();
}
@Override
public void configureFromExtensions(List<Extension> extensions)
{
/* do nothing */
}
public DefaultWebSocketClient getClient()
{
return client;
@ -57,7 +66,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
{
super.onClose();
factory.sessionClosed(getSession());
};
}
@Override
public void onOpen()
@ -68,7 +77,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
connected = true;
}
super.onOpen();
}
};
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
@ -76,4 +85,10 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
masker.setMask(frame);
super.output(context,callback,frame);
}
@Override
public void setIncoming(IncomingFrames incoming)
{
getParser().setIncomingFramesHandler(incoming);
}
}

View File

@ -20,25 +20,12 @@ package org.eclipse.jetty.websocket.core.api;
import java.net.InetSocketAddress;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.ConnectionState;
/**
* Base Connection concepts
*/
public interface BaseConnection
public interface LogicalConnection extends OutgoingFrames
{
/**
* Connection suspend token
*/
public static interface SuspendToken
{
/**
* Resume a previously suspended connection.
*/
void resume();
}
/**
* Send a websocket Close frame, without a status code or reason.
* <p>
@ -47,7 +34,7 @@ public interface BaseConnection
* @see StatusCode
* @see #close(int, String)
*/
void close();
public void close();
/**
* Send a websocket Close frame, with status code.
@ -60,7 +47,7 @@ public interface BaseConnection
* the (optional) reason. (can be null for no reason)
* @see StatusCode
*/
void close(int statusCode, String reason);
public void close(int statusCode, String reason);
/**
* Terminate the connection (no close frame sent)

View File

@ -0,0 +1,30 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.api;
/**
* Connection suspend token
*/
public interface SuspendToken
{
/**
* Resume a previously suspended connection.
*/
void resume();
}

View File

@ -33,12 +33,22 @@ public interface UpgradeRequest
public String getHeader(String name);
public Map<String, List<String>> getHeaders();
public String getHost();
public String getHttpEndPointName();
public String getHttpVersion();
public String getMethod();
public String getOrigin();
public Map<String,String[]> getParameterMap();
public String getQueryString();
public String getRemoteURI();
public List<String> getSubProtocols();
public boolean hasSubProtocol(String test);

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.util.Callback;
/**
* Connection interface for WebSocket protocol <a href="https://tools.ietf.org/html/rfc6455">RFC-6455</a>.
*/
public interface WebSocketConnection extends BaseConnection
public interface WebSocketConnection extends LogicalConnection
{
/**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection.

View File

@ -30,13 +30,14 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.BaseConnection.SuspendToken;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.InternalConnection;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
@ -46,7 +47,7 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* MuxChannel, acts as WebSocketConnection for specific sub-channel.
*/
public class MuxChannel implements WebSocketConnection, IncomingFrames, OutgoingFrames, SuspendToken
public class MuxChannel implements WebSocketConnection, InternalConnection, IncomingFrames, OutgoingFrames, SuspendToken
{
private static final Logger LOG = Log.getLogger(MuxChannel.class);
@ -95,6 +96,12 @@ public class MuxChannel implements WebSocketConnection, IncomingFrames, Outgoing
}
}
@Override
public void configureFromExtensions(List<Extension> extensions)
{
/* ignore */
}
@Override
public void disconnect()
{
@ -251,10 +258,16 @@ public class MuxChannel implements WebSocketConnection, IncomingFrames, Outgoing
}
}
@Override
public void setIncoming(IncomingFrames incoming)
{
this.incoming = incoming;
}
@Override
public void setSession(WebSocketSession session)
{
this.session = session;
this.incoming = session;
session.setOutgoing(this);
}

View File

@ -81,7 +81,7 @@ public class MuxGenerator
MuxAddChannelRequest op = (MuxAddChannelRequest)block;
byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
b |= (byte)((op.getRsv() & 0x07) << 2); // rsv
b |= (op.getEnc() & 0x03); // enc
b |= (op.getEncoding() & 0x03); // enc
payload.put(b); // opcode + rsv + enc
writeChannelId(payload,op.getChannelId());
write139Buffer(payload,op.getHandshake());
@ -93,7 +93,7 @@ public class MuxGenerator
byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
b |= (op.isFailed()?0x10:0x00); // failure bit
b |= (byte)((op.getRsv() & 0x03) << 2); // rsv
b |= (op.getEnc() & 0x03); // enc
b |= (op.getEncoding() & 0x03); // enc
payload.put(b); // opcode + f + rsv + enc
writeChannelId(payload,op.getChannelId());
if (op.getHandshake() != null)

View File

@ -147,7 +147,7 @@ public class MuxParser
{
MuxAddChannelRequest op = new MuxAddChannelRequest();
op.setRsv((byte)((b & 0x1C) >> 2));
op.setEnc((byte)(b & 0x03));
op.setEncoding((byte)(b & 0x03));
op.setChannelId(readChannelId(buffer));
long handshakeSize = read139EncodedSize(buffer);
op.setHandshake(readBlock(buffer,handshakeSize));
@ -159,7 +159,7 @@ public class MuxParser
MuxAddChannelResponse op = new MuxAddChannelResponse();
op.setFailed((b & 0x10) != 0);
op.setRsv((byte)((byte)(b & 0x0C) >> 2));
op.setEnc((byte)(b & 0x03));
op.setEncoding((byte)(b & 0x03));
op.setChannelId(readChannelId(buffer));
long handshakeSize = read139EncodedSize(buffer);
op.setHandshake(readBlock(buffer,handshakeSize));

View File

@ -0,0 +1,217 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
public class MuxRequest implements UpgradeRequest
{
public static final String HEADER_VALUE_DELIM="\"\\\n\r\t\f\b%+ ;=";
public static UpgradeRequest merge(UpgradeRequest baseReq, UpgradeRequest deltaReq)
{
MuxRequest req = new MuxRequest(baseReq);
req.method = overlay(deltaReq.getMethod(),req.getMethod());
// TODO: finish
return req;
}
private static String overlay(String val, String defVal)
{
if (val == null)
{
return defVal;
}
return val;
}
public static UpgradeRequest parse(ByteBuffer handshake)
{
MuxRequest req = new MuxRequest();
// TODO Auto-generated method stub
return req;
}
private String method;
private String httpVersion;
private String remoteURI;
private String queryString;
private List<String> subProtocols;
private Map<String, String> cookies;
private List<ExtensionConfig> extensions;
private Map<String, List<String>> headers;
private Map<String, String[]> parameterMap;
public MuxRequest()
{
// TODO Auto-generated constructor stub
}
public MuxRequest(UpgradeRequest copy)
{
// TODO Auto-generated constructor stub
}
@Override
public void addExtensions(String... extConfigs)
{
for (String extConfig : extConfigs)
{
extensions.add(ExtensionConfig.parse(extConfig));
}
}
@Override
public Map<String, String> getCookieMap()
{
return cookies;
}
@Override
public List<ExtensionConfig> getExtensions()
{
return extensions;
}
@Override
public String getHeader(String name)
{
List<String> values = headers.get(name);
// not set
if ((values == null) || (values.isEmpty()))
{
return null;
}
// only 1 value (most common scenario)
if (values.size() == 1)
{
return values.get(0);
}
// merge multiple values together
StringBuilder ret = new StringBuilder();
boolean delim = false;
for (String value : values)
{
if (delim)
{
ret.append(", ");
}
QuotedStringTokenizer.quoteIfNeeded(ret,value,HEADER_VALUE_DELIM);
delim = true;
}
return ret.toString();
}
@Override
public Map<String, List<String>> getHeaders()
{
return headers;
}
@Override
public String getHost()
{
return getHeader("Host");
}
@Override
public String getHttpVersion()
{
return httpVersion;
}
@Override
public String getMethod()
{
return method;
}
@Override
public String getOrigin()
{
return getHeader("Origin");
}
@Override
public Map<String, String[]> getParameterMap()
{
return parameterMap;
}
@Override
public String getQueryString()
{
return queryString;
}
@Override
public String getRemoteURI()
{
return remoteURI;
}
@Override
public List<String> getSubProtocols()
{
return subProtocols;
}
@Override
public boolean hasSubProtocol(String test)
{
for (String protocol : subProtocols)
{
if (protocol.equalsIgnoreCase(test))
{
return true;
}
}
return false;
}
@Override
public boolean isOrigin(String test)
{
return test.equalsIgnoreCase(getOrigin());
}
@Override
public void setSubProtocols(String protocols)
{
this.subProtocols.clear();
if (StringUtil.isBlank(protocols))
{
return;
}
for (String protocol : protocols.split("\\s*,\\s*"))
{
this.subProtocols.add(protocol);
}
}
}

View File

@ -0,0 +1,129 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.websocket.core.api.UpgradeException;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
public class MuxResponse implements UpgradeResponse
{
@Override
public void addHeader(String name, String value)
{
// TODO Auto-generated method stub
}
@Override
public String getAcceptedSubProtocol()
{
// TODO Auto-generated method stub
return null;
}
@Override
public List<ExtensionConfig> getExtensions()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Set<String> getHeaderNamesSet()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHeaderValue(String name)
{
// TODO Auto-generated method stub
return null;
}
@Override
public Iterator<String> getHeaderValues(String name)
{
// TODO Auto-generated method stub
return null;
}
@Override
public int getStatusCode()
{
// TODO Auto-generated method stub
return 0;
}
@Override
public String getStatusReason()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isSuccess()
{
// TODO Auto-generated method stub
return false;
}
@Override
public void sendForbidden(String message) throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void setAcceptedSubProtocol(String protocol)
{
// TODO Auto-generated method stub
}
@Override
public void setExtensions(List<ExtensionConfig> extensions)
{
// TODO Auto-generated method stub
}
@Override
public void setHeader(String name, String value)
{
// TODO Auto-generated method stub
}
@Override
public void validateWebSocketHash(String expectedHash) throws UpgradeException
{
// TODO Auto-generated method stub
}
}

View File

@ -24,12 +24,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
@ -98,6 +98,29 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
return addServer;
}
public MuxChannel getChannel(long channelId, boolean create)
{
if (channelId == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
MuxChannel channel = channels.get(channelId);
if (channel == null)
{
if (create)
{
channel = new MuxChannel(channelId,this);
channels.put(channelId,channel);
}
else
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
}
return channel;
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -193,39 +216,35 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set");
}
if (request.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
// Pre-allocate channel.
long channelId = request.getChannelId();
MuxChannel channel = new MuxChannel(channelId,this);
this.channels.put(channelId,channel);
MuxChannel channel = getChannel(channelId, true);
// submit to upgrade handshake process
try
{
String requestHandshake = BufferUtil.toUTF8String(request.getHandshake());
if (request.isDeltaEncoded())
switch (request.getEncoding())
{
// Merge original request headers out of physical connection.
requestHandshake = mergeHeaders(physicalRequestHeaders,requestHandshake);
case MuxAddChannelRequest.IDENTITY_ENCODING:
{
UpgradeRequest idenReq = MuxRequest.parse(request.getHandshake());
addServer.handshake(this,channel,idenReq);
break;
}
String responseHandshake = addServer.handshake(channel,requestHandshake);
if (StringUtil.isNotBlank(responseHandshake))
case MuxAddChannelRequest.DELTA_ENCODING:
{
// Upgrade Success
MuxAddChannelResponse response = new MuxAddChannelResponse();
response.setChannelId(request.getChannelId());
response.setFailed(false);
response.setHandshake(responseHandshake);
// send response
this.generator.generate(response);
UpgradeRequest baseReq = addServer.getPhysicalHandshakeRequest();
UpgradeRequest deltaReq = MuxRequest.parse(request.getHandshake());
UpgradeRequest mergedReq = MuxRequest.merge(baseReq,deltaReq);
addServer.handshake(this,channel,mergedReq);
break;
}
else
default:
{
// TODO: trigger error?
// TODO: ERROR
break;
}
}
}
catch (Throwable t)
@ -250,18 +269,9 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set");
}
if (response.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
// Process channel
long channelId = response.getChannelId();
MuxChannel channel = this.channels.get(channelId);
if (channel == null)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
MuxChannel channel = getChannel(channelId,false);
// Process Response headers
try
@ -288,18 +298,9 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
@Override
public void onMuxDropChannel(MuxDropChannel drop)
{
if (drop.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
// Process channel
long channelId = drop.getChannelId();
MuxChannel channel = this.channels.get(channelId);
if (channel == null)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
MuxChannel channel = getChannel(channelId,false);
String reason = "Mux " + drop.toString();
reason = StringUtil.truncate(reason,(WebSocketFrame.MAX_CONTROL_PAYLOAD - 2));
@ -335,11 +336,6 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
@Override
public void onMuxFlowControl(MuxFlowControl flow)
{
if (flow.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow");
@ -347,11 +343,7 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
// Process channel
long channelId = flow.getChannelId();
MuxChannel channel = this.channels.get(channelId);
if (channel == null)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
MuxChannel channel = getChannel(channelId,false);
// TODO: set channel quota
}
@ -394,6 +386,18 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
generator.output(context,callback,channelId,frame);
}
/**
* Write an OP out the physical connection.
*
* @param op
* the mux operation to write
* @throws IOException
*/
public void output(MuxControlBlock op) throws IOException
{
generator.generate(op);
}
public void setAddClient(MuxAddClient addClient)
{
this.addClient = addClient;

View File

@ -20,8 +20,11 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add;
import java.io.IOException;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
/**
@ -29,6 +32,10 @@ import org.eclipse.jetty.websocket.core.io.WebSocketSession;
*/
public interface MuxAddServer
{
public UpgradeRequest getPhysicalHandshakeRequest();
public UpgradeResponse getPhysicalHandshakeResponse();
/**
* Perform the handshake.
*
@ -36,11 +43,10 @@ public interface MuxAddServer
* the channel to attach the {@link WebSocketSession} to.
* @param requestHandshake
* the request handshake (request headers)
* @return the response handshake (the response headers)
* @throws AbstractMuxException
* if unable to handshake
* @throws IOException
* if unable to parse request headers
*/
String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException;
void handshake(Muxer muxer, MuxChannel channel, UpgradeRequest request) throws MuxException, IOException;
}

View File

@ -20,13 +20,18 @@ package org.eclipse.jetty.websocket.core.extensions.mux.op;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
public class MuxAddChannelRequest implements MuxControlBlock
{
public static final byte IDENTITY_ENCODING = (byte)0x00;
public static final byte DELTA_ENCODING = (byte)0x01;
private long channelId = -1;
private byte enc;
private byte encoding;
private ByteBuffer handshake;
private byte rsv;
@ -35,9 +40,9 @@ public class MuxAddChannelRequest implements MuxControlBlock
return channelId;
}
public byte getEnc()
public byte getEncoding()
{
return enc;
return encoding;
}
public ByteBuffer getHandshake()
@ -67,12 +72,12 @@ public class MuxAddChannelRequest implements MuxControlBlock
public boolean isDeltaEncoded()
{
return (enc == 1);
return (encoding == DELTA_ENCODING);
}
public boolean isIdentityEncoded()
{
return (enc == 0);
return (encoding == IDENTITY_ENCODING);
}
public void setChannelId(long channelId)
@ -80,9 +85,9 @@ public class MuxAddChannelRequest implements MuxControlBlock
this.channelId = channelId;
}
public void setEnc(byte enc)
public void setEncoding(byte enc)
{
this.enc = enc;
this.encoding = enc;
}
public void setHandshake(ByteBuffer handshake)
@ -97,6 +102,11 @@ public class MuxAddChannelRequest implements MuxControlBlock
}
}
public void setHandshake(String rawstring)
{
setHandshake(BufferUtil.toBuffer(rawstring,StringUtil.__UTF8_CHARSET));
}
public void setRsv(byte rsv)
{
this.rsv = rsv;

View File

@ -21,13 +21,17 @@ package org.eclipse.jetty.websocket.core.extensions.mux.op;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
public class MuxAddChannelResponse implements MuxControlBlock
{
public static final byte IDENTITY_ENCODING = (byte)0x00;
public static final byte DELTA_ENCODING = (byte)0x01;
private long channelId;
private byte enc;
private byte encoding;
private byte rsv;
private boolean failed = false;
private ByteBuffer handshake;
@ -37,9 +41,9 @@ public class MuxAddChannelResponse implements MuxControlBlock
return channelId;
}
public byte getEnc()
public byte getEncoding()
{
return enc;
return encoding;
}
public ByteBuffer getHandshake()
@ -67,19 +71,29 @@ public class MuxAddChannelResponse implements MuxControlBlock
return rsv;
}
public boolean isDeltaEncoded()
{
return (encoding == DELTA_ENCODING);
}
public boolean isFailed()
{
return failed;
}
public boolean isIdentityEncoded()
{
return (encoding == IDENTITY_ENCODING);
}
public void setChannelId(long channelId)
{
this.channelId = channelId;
}
public void setEnc(byte enc)
public void setEncoding(byte enc)
{
this.enc = enc;
this.encoding = enc;
}
public void setFailed(boolean failed)
@ -101,7 +115,7 @@ public class MuxAddChannelResponse implements MuxControlBlock
public void setHandshake(String responseHandshake)
{
setHandshake(BufferUtil.toBuffer(responseHandshake));
setHandshake(BufferUtil.toBuffer(responseHandshake,StringUtil.__UTF8_CHARSET));
}
public void setRsv(byte rsv)

View File

@ -37,9 +37,9 @@ import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.api.BaseConnection;
import org.eclipse.jetty.websocket.core.api.CloseException;
import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
@ -53,7 +53,7 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements BaseConnection, BaseConnection.SuspendToken, OutgoingFrames
public abstract class AbstractWebSocketConnection extends AbstractConnection implements InternalConnection, SuspendToken
{
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io;
import java.io.IOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Utility class to pipe {@link IncomingFrames} and {@link OutgoingFrames} around
*/
public class FramePipes
{
private static class In2Out implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(In2Out.class);
private OutgoingFrames outgoing;
public In2Out(OutgoingFrames outgoing)
{
this.outgoing = outgoing;
}
@Override
public void incoming(WebSocketException e)
{
/* cannot send exception on */
}
@Override
public void incoming(WebSocketFrame frame)
{
try
{
this.outgoing.output(null,new FutureCallback<>(),frame);
}
catch (IOException e)
{
LOG.debug(e);
}
}
}
private static class Out2In implements OutgoingFrames
{
private IncomingFrames incoming;
public Out2In(IncomingFrames incoming)
{
this.incoming = incoming;
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
this.incoming.incoming(frame);
}
}
public static OutgoingFrames to(final IncomingFrames incoming)
{
return new Out2In(incoming);
}
public static IncomingFrames to(final OutgoingFrames outgoing)
{
return new In2Out(outgoing);
}
}

View File

@ -0,0 +1,33 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io;
import java.util.List;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.LogicalConnection;
public interface InternalConnection extends LogicalConnection
{
void configureFromExtensions(List<Extension> extensions);
void setIncoming(IncomingFrames incoming);
void setSession(WebSocketSession session);
}

View File

@ -26,7 +26,8 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.BaseConnection;
import org.eclipse.jetty.websocket.core.api.LogicalConnection;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
@ -41,17 +42,17 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
/**
* The reference to the base connection.
* The reference to the logical connection.
* <p>
* This will be the {@link AbstractWebSocketConnection} on normal websocket use, and be a MuxConnection when MUX is in the picture.
* This will be the {@link AbstractWebSocketConnection} on normal websocket use, and be a MuxChannel when MUX is in the picture.
*/
private final BaseConnection baseConnection;
private final LogicalConnection baseConnection;
private final WebSocketPolicy policy;
private final String subprotocol;
private final EventDriver websocket;
private OutgoingFrames outgoing;
public WebSocketSession(EventDriver websocket, BaseConnection connection, WebSocketPolicy policy, String subprotocol)
public WebSocketSession(EventDriver websocket, LogicalConnection connection, WebSocketPolicy policy, String subprotocol)
{
super();
this.websocket = websocket;

View File

@ -27,13 +27,13 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Helpful utility class to parse arbitrary mux events from a physical connection's OutgoingFrames.
*
* @see MuxInjector
* @see MuxEncoder
*/
public class MuxReducer extends MuxEventCapture implements OutgoingFrames
public class MuxDecoder extends MuxEventCapture implements OutgoingFrames
{
private MuxParser parser;
public MuxReducer()
public MuxDecoder()
{
parser = new MuxParser();
parser.setEvents(this);

View File

@ -20,9 +20,7 @@ package org.eclipse.jetty.websocket.core.extensions.mux;
import java.io.IOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.io.FramePipes;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
@ -30,19 +28,26 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Helpful utility class to send arbitrary mux events into a physical connection's IncomingFrames.
*
* @see MuxReducer
* @see MuxDecoder
*/
public class MuxInjector implements OutgoingFrames
public class MuxEncoder
{
private static final Logger LOG = Log.getLogger(MuxInjector.class);
private IncomingFrames incoming;
public static MuxEncoder toIncoming(IncomingFrames incoming)
{
return new MuxEncoder(FramePipes.to(incoming));
}
public static MuxEncoder toOutgoing(OutgoingFrames outgoing)
{
return new MuxEncoder(outgoing);
}
private MuxGenerator generator;
public MuxInjector(IncomingFrames incoming)
private MuxEncoder(OutgoingFrames outgoing)
{
this.incoming = incoming;
this.generator = new MuxGenerator();
this.generator.setOutgoing(this);
this.generator.setOutgoing(outgoing);
}
public void frame(long channelId, WebSocketFrame frame) throws IOException
@ -54,11 +59,4 @@ public class MuxInjector implements OutgoingFrames
{
this.generator.generate(op);
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
LOG.debug("Injecting {} to {}",frame,incoming);
this.incoming.incoming(frame);
}
}

View File

@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux.add;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
public class DummyMuxAddClient implements MuxAddClient
{
@Override
public WebSocketSession createSession(MuxAddChannelResponse response)
{
// TODO Auto-generated method stub
return null;
}
}

View File

@ -22,10 +22,14 @@ import java.io.IOException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.examples.echo.AdapterEchoSocket;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory;
@ -49,7 +53,21 @@ public class DummyMuxAddServer implements MuxAddServer
}
@Override
public String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException
public UpgradeRequest getPhysicalHandshakeRequest()
{
// TODO Auto-generated method stub
return null;
}
@Override
public UpgradeResponse getPhysicalHandshakeResponse()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void handshake(Muxer muxer, MuxChannel channel, UpgradeRequest request) throws MuxException, IOException
{
StringBuilder response = new StringBuilder();
response.append("HTTP/1.1 101 Switching Protocols\r\n");
@ -65,6 +83,12 @@ public class DummyMuxAddServer implements MuxAddServer
channel.onOpen();
session.onConnect();
return response.toString();
MuxAddChannelResponse addChannelResponse = new MuxAddChannelResponse();
addChannelResponse.setChannelId(channel.getChannelId());
addChannelResponse.setEncoding(MuxAddChannelResponse.IDENTITY_ENCODING);
addChannelResponse.setFailed(false);
addChannelResponse.setHandshake(response.toString());
muxer.output(addChannelResponse);
}
}

View File

@ -0,0 +1,104 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux.add;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxDecoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxEncoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.LocalWebSocketConnection;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class MuxerAddClientTest
{
@Rule
public TestName testname = new TestName();
@Test
@Ignore("Interrim, not functional yet")
public void testAddChannel_Client() throws Exception
{
// Client side physical socket
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newClientPolicy());
physical.onOpen();
// Server Reader
MuxDecoder serverRead = new MuxDecoder();
// Client side Muxer
Muxer muxer = new Muxer(physical,serverRead);
DummyMuxAddClient addClient = new DummyMuxAddClient();
muxer.setAddClient(addClient);
// Server Writer
MuxEncoder serverWrite = MuxEncoder.toIncoming(physical);
// Build AddChannelRequest handshake data
StringBuilder request = new StringBuilder();
request.append("GET /echo HTTP/1.1\r\n");
request.append("Host: localhost\r\n");
request.append("Upgrade: websocket\r\n");
request.append("Connection: Upgrade\r\n");
request.append("Sec-WebSocket-Key: ZDTIRU5vU9xOfkg8JAgN3A==\r\n");
request.append("Sec-WebSocket-Version: 13\r\n");
request.append("\r\n");
// Build AddChannelRequest
long channelId = 1L;
MuxAddChannelRequest req = new MuxAddChannelRequest();
req.setChannelId(channelId);
req.setEncoding((byte)0);
req.setHandshake(request.toString());
// Have client sent AddChannelRequest
MuxChannel channel = muxer.getChannel(channelId,true);
MuxEncoder clientWrite = MuxEncoder.toOutgoing(channel);
clientWrite.op(req);
// Have server read request
serverRead.assertHasOp(MuxOp.ADD_CHANNEL_REQUEST,1);
// prepare AddChannelResponse
StringBuilder response = new StringBuilder();
response.append("HTTP/1.1 101 Switching Protocols\r\n");
response.append("Upgrade: websocket\r\n");
response.append("Connection: upgrade\r\n");
response.append("Sec-WebSocket-Accept: Kgo85/8KVE8YPONSeyhgL3GwqhI=\r\n");
response.append("\r\n");
MuxAddChannelResponse resp = new MuxAddChannelResponse();
resp.setChannelId(channelId);
resp.setFailed(false);
resp.setEncoding((byte)0);
resp.setHandshake(resp.toString());
// Server writes add channel response
serverWrite.op(resp);
// TODO: handle the upgrade on client side.
}
}

View File

@ -20,11 +20,10 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add;
import static org.hamcrest.Matchers.*;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxInjector;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxDecoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxEncoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxReducer;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
@ -32,6 +31,7 @@ import org.eclipse.jetty.websocket.core.io.LocalWebSocketConnection;
import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@ -42,22 +42,31 @@ public class MuxerAddServerTest
public TestName testname = new TestName();
@Test
@Ignore("Interrim, not functional yet")
public void testAddChannel_Server() throws Exception
{
// Server side physical connection
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newServerPolicy());
physical.onOpen();
MuxReducer reducer = new MuxReducer();
// Client reader
MuxDecoder clientRead = new MuxDecoder();
// Represents a server side muxer.
Muxer muxer = new Muxer(physical,reducer);
// Build up server side muxer.
Muxer muxer = new Muxer(physical,clientRead);
DummyMuxAddServer addServer = new DummyMuxAddServer();
muxer.setAddServer(addServer);
MuxInjector inject = new MuxInjector(muxer);
// Wire up physical connection to forward incoming frames to muxer
physical.setIncoming(muxer);
// Trigger AddChannel
// Client simulator
// Can inject mux encapsulated frames into physical connection as if from
// physical connection.
MuxEncoder clientWrite = MuxEncoder.toIncoming(physical);
// Build AddChannelRequest handshake data
StringBuilder request = new StringBuilder();
request.append("GET /echo HTTP/1.1\r\n");
request.append("Host: localhost\r\n");
@ -67,27 +76,29 @@ public class MuxerAddServerTest
request.append("Sec-WebSocket-Version: 13\r\n");
request.append("\r\n");
// Build AddChannelRequest
MuxAddChannelRequest req = new MuxAddChannelRequest();
req.setChannelId(1);
req.setEnc((byte)0);
req.setHandshake(BufferUtil.toBuffer(request.toString()));
req.setEncoding((byte)0);
req.setHandshake(request.toString());
inject.op(req);
// Have client sent AddChannelRequest
clientWrite.op(req);
// Make sure we got AddChannelResponse
reducer.assertHasOp(MuxOp.ADD_CHANNEL_RESPONSE,1);
MuxAddChannelResponse response = (MuxAddChannelResponse)reducer.getOps().pop();
// Make sure client got AddChannelResponse
clientRead.assertHasOp(MuxOp.ADD_CHANNEL_RESPONSE,1);
MuxAddChannelResponse response = (MuxAddChannelResponse)clientRead.getOps().pop();
Assert.assertThat("AddChannelResponse.channelId",response.getChannelId(),is(1L));
Assert.assertThat("AddChannelResponse.failed",response.isFailed(),is(false));
Assert.assertThat("AddChannelResponse.handshake",response.getHandshake(),notNullValue());
Assert.assertThat("AddChannelResponse.handshakeSize",response.getHandshakeSize(),is(57L));
reducer.reset();
clientRead.reset();
// Send simple echo request
inject.frame(1,WebSocketFrame.text("Hello World"));
clientWrite.frame(1,WebSocketFrame.text("Hello World"));
// Test for echo response (is there a user echo websocket connected to the sub-channel?)
reducer.assertHasFrame(OpCode.TEXT,1L,1);
clientRead.assertHasFrame(OpCode.TEXT,1L,1);
}
}

View File

@ -23,17 +23,21 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.ConnectionState;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
import org.junit.rules.TestName;
public class LocalWebSocketConnection implements WebSocketConnection
public class LocalWebSocketConnection implements WebSocketConnection, IncomingFrames
{
private final String id;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
private boolean open = false;
private IncomingFrames incoming;
public LocalWebSocketConnection()
{
@ -68,6 +72,11 @@ public class LocalWebSocketConnection implements WebSocketConnection
open = false;
}
public IncomingFrames getIncoming()
{
return incoming;
}
@Override
public WebSocketPolicy getPolicy()
{
@ -92,10 +101,21 @@ public class LocalWebSocketConnection implements WebSocketConnection
return null;
}
@Override
public void incoming(WebSocketException e)
{
incoming.incoming(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
incoming.incoming(frame);
}
@Override
public boolean isInputClosed()
{
// TODO Auto-generated method stub
return false;
}
@ -108,7 +128,6 @@ public class LocalWebSocketConnection implements WebSocketConnection
@Override
public boolean isOutputClosed()
{
// TODO Auto-generated method stub
return false;
}
@ -121,18 +140,27 @@ public class LocalWebSocketConnection implements WebSocketConnection
@Override
public void onCloseHandshake(boolean incoming, CloseInfo close)
{
// TODO Auto-generated method stub
}
public void onOpen() {
open = true;
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
}
public void setIncoming(IncomingFrames incoming)
{
this.incoming = incoming;
}
public void setPolicy(WebSocketPolicy policy)
{
this.policy = policy;

View File

@ -91,29 +91,43 @@ public class ServletWebSocketRequest extends HttpServletRequestWrapper implement
return extensions;
}
@Override
public Map<String, List<String>> getHeaders()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHost()
{
return getHeader("Host");
}
@Override
public String getHttpVersion()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getOrigin()
{
return getHeader("Origin");
}
/**
* Get the endpoint of the WebSocket connection.
* <p>
* Per the <a href="https://tools.ietf.org/html/rfc6455#section-1.3">Opening Handshake (RFC 6455)</a>
*/
@Override
public String getHttpEndPointName()
public String getRemoteURI()
{
return getRequestURI();
}
@Override
public String getOrigin()
{
return getHeader("Origin");
}
@Override
public List<String> getSubProtocols()
{

View File

@ -0,0 +1,60 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.io.InternalConnection;
public class UpgradeContext
{
private InternalConnection connection;
private UpgradeRequest request;
private UpgradeResponse response;
public InternalConnection getConnection()
{
return connection;
}
public UpgradeRequest getRequest()
{
return request;
}
public UpgradeResponse getResponse()
{
return response;
}
public void setConnection(InternalConnection connection)
{
this.connection = connection;
}
public void setRequest(UpgradeRequest request)
{
this.request = request;
}
public void setResponse(UpgradeResponse response)
{
this.response = response;
}
}

View File

@ -18,27 +18,37 @@
package org.eclipse.jetty.websocket.server;
import java.util.List;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
public class WebSocketServerConnection extends AbstractWebSocketConnection
{
private final WebSocketServerFactory factory;
private boolean connected;
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy,
ByteBufferPool bufferPool, WebSocketServerFactory factory)
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool,
WebSocketServerFactory factory)
{
super(endp,executor,scheduler,policy,bufferPool);
this.factory = factory;
this.connected = false;
}
@Override
public void configureFromExtensions(List<Extension> extensions)
{
getParser().configureFromExtensions(extensions);
getGenerator().configureFromExtensions(extensions);
}
@Override
public void onClose()
{
@ -56,4 +66,10 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
}
super.onOpen();
}
@Override
public void setIncoming(IncomingFrames incoming)
{
getParser().setIncomingFramesHandler(incoming);
}
}

View File

@ -51,6 +51,7 @@ import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.InternalConnection;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver;
@ -65,6 +66,18 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
public static UpgradeContext getActiveUpgradeContext()
{
return ACTIVE_CONTEXT.get();
}
protected static void setActiveUpgradeContext(UpgradeContext connection)
{
ACTIVE_CONTEXT.set(connection);
}
private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
{
handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
@ -125,6 +138,15 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
WebSocketCreator creator = getCreator();
UpgradeContext context = getActiveUpgradeContext();
if (context == null)
{
context = new UpgradeContext();
setActiveUpgradeContext(context);
}
context.setRequest(sockreq);
context.setResponse(sockresp);
Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
// Handle response forbidden (and similar paths)
@ -357,14 +379,20 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
}
// Create connection
UpgradeContext context = getActiveUpgradeContext();
InternalConnection connection = context.getConnection();
if (connection == null)
{
HttpConnection http = HttpConnection.getCurrentConnection();
EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
WebSocketServerConnection connection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
connection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
LOG.debug("HttpConnection: {}",http);
LOG.debug("AsyncWebSocketConnection: {}",connection);
}
// Initialize / Negotiate Extensions
WebSocketSession session = new WebSocketSession(driver,connection,getPolicy(),response.getAcceptedSubProtocol());
@ -379,8 +407,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Connect extensions
if (extensions != null)
{
connection.getParser().configureFromExtensions(extensions);
connection.getGenerator().configureFromExtensions(extensions);
connection.configureFromExtensions(extensions);
Iterator<Extension> extIter;
// Connect outgoings
@ -406,7 +433,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// configure session for outgoing flows
session.setOutgoing(outgoing);
// configure connection for incoming flows
connection.getParser().setIncomingFramesHandler(incoming);
connection.setIncoming(incoming);
// Tell jetty about the new connection
request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -88,6 +89,8 @@ public abstract class WebSocketServlet extends HttpServlet
private final Logger LOG = Log.getLogger(getClass());
private WebSocketServerFactory webSocketFactory;
public abstract void configure(WebSocketServerFactory factory);
@Override
public void destroy()
{
@ -146,8 +149,6 @@ public abstract class WebSocketServlet extends HttpServlet
}
}
public abstract void configure(WebSocketServerFactory factory);
/**
* @see javax.servlet.http.HttpServlet#service(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
*/

View File

@ -0,0 +1,47 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server.mux;
import java.nio.ByteBuffer;
import org.eclipse.jetty.server.HttpInput;
/**
* HttpInput for Empty Http body sections.
*/
public class EmptyHttpInput extends HttpInput<ByteBuffer>
{
@Override
protected int get(ByteBuffer item, byte[] buffer, int offset, int length)
{
return 0;
}
@Override
protected void onContentConsumed(ByteBuffer item)
{
// do nothing
}
@Override
protected int remaining(ByteBuffer item)
{
return 0;
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.server.mux;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
@ -26,12 +28,12 @@ import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
/**
* Process incoming AddChannelRequest headers within the existing Jetty framework. Benefitting from Server container knowledge and various webapp configuration
* Process incoming AddChannelRequest headers within the existing Jetty framework. Benefiting from Server container knowledge and various webapp configuration
* knowledge.
*/
public class HttpChannelOverMux extends HttpChannel<String>
public class HttpChannelOverMux extends HttpChannel<ByteBuffer>
{
public HttpChannelOverMux(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput<String> input)
public HttpChannelOverMux(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
{
super(connector,configuration,endPoint,transport,input);
}

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server.mux;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
/**
* Take {@link ResponseInfo} objects and convert to bytes for response.
*/
public class HttpTransportOverMux implements HttpTransport
{
private static final Logger LOG = Log.getLogger(HttpTransportOverMux.class);
private final BlockingCallback streamBlocker = new BlockingCallback();
public HttpTransportOverMux(Muxer muxer, MuxChannel channel)
{
// TODO Auto-generated constructor stub
}
@Override
public void completed()
{
LOG.debug("completed");
}
/**
* Process ResponseInfo object into AddChannelResponse
*/
@Override
public void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent) throws IOException
{
send(info,responseBodyContent,lastContent,streamBlocker.getPhase(),streamBlocker);
try
{
streamBlocker.block();
}
catch (IOException e)
{
throw e;
}
catch (Exception e)
{
throw new EofException(e);
}
}
@Override
public <C> void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent, C context, Callback<C> callback)
{
if (lastContent == false)
{
// throw error
}
if (info.getContentLength() > 0)
{
// throw error
}
// prepare the AddChannelResponse
// TODO: look at HttpSender in jetty-client for generator loop logic
}
}

View File

@ -20,8 +20,17 @@ package org.eclipse.jetty.websocket.server.mux;
import java.io.IOException;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.add.MuxAddServer;
/**
@ -29,20 +38,73 @@ import org.eclipse.jetty.websocket.core.extensions.mux.add.MuxAddServer;
*/
public class MuxAddHandler implements MuxAddServer
{
/** Represents physical connector */
private Connector connector;
/** Used for local address */
private EndPoint endPoint;
/** The original request handshake */
private UpgradeRequest baseHandshakeRequest;
/** The original request handshake */
private UpgradeResponse baseHandshakeResponse;
private int maximumHeaderSize = 32 * 1024;
@Override
public UpgradeRequest getPhysicalHandshakeRequest()
{
// TODO Auto-generated method stub
return null;
}
@Override
public UpgradeResponse getPhysicalHandshakeResponse()
{
// TODO Auto-generated method stub
return null;
}
/**
* An incoming MuxAddChannel request.
*
* @param the
* channel this request should be bound to
* @param requestHandshake
* the incoming request headers
* @param request
* the incoming request headers (complete and merged if delta encoded)
* @return the outgoing response headers
*/
@Override
public String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException
public void handshake(Muxer muxer, MuxChannel channel, UpgradeRequest request) throws MuxException, IOException
{
// Need to call into HttpChannel to get the websocket properly setup.
HttpTransportOverMux transport = new HttpTransportOverMux(muxer,channel);
EmptyHttpInput input = new EmptyHttpInput();
HttpConfiguration configuration = new HttpConfiguration();
return null;
HttpChannelOverMux httpChannel = new HttpChannelOverMux(//
connector,configuration,endPoint,transport,input);
HttpMethod method = HttpMethod.fromString(request.getMethod());
HttpVersion version = HttpVersion.fromString(request.getHttpVersion());
httpChannel.startRequest(method,request.getMethod(),request.getRemoteURI(),version);
for (String headerName : request.getHeaders().keySet())
{
HttpHeader header = HttpHeader.lookAheadGet(headerName.getBytes(),0,headerName.length());
for (String value : request.getHeaders().get(headerName))
{
httpChannel.parsedHeader(header,headerName,value);
}
}
httpChannel.headerComplete();
httpChannel.messageComplete();
httpChannel.run(); // calls into server for appropriate resource
// TODO: what's in request handshake is not enough to process the request.
// like a partial http request. (consider this a AddChannelRequest failure)
throw new MuxException("Not a valid request");
}
}