Attempting to get Mux sub-channel to flow thru server as well

This commit is contained in:
Joakim Erdfelt 2012-11-02 11:20:36 -07:00
parent dd0b1a39b8
commit 80f5fa1a18
35 changed files with 1300 additions and 190 deletions

View File

@ -222,6 +222,13 @@ public class ClientUpgradeRequest implements UpgradeRequest
return headers.get(name); return headers.get(name);
} }
@Override
public Map<String, List<String>> getHeaders()
{
// TODO Auto-generated method stub
return null;
}
@Override @Override
public String getHost() public String getHost()
{ {
@ -229,9 +236,10 @@ public class ClientUpgradeRequest implements UpgradeRequest
} }
@Override @Override
public String getHttpEndPointName() public String getHttpVersion()
{ {
return httpEndPointName; // TODO Auto-generated method stub
return null;
} }
public String getKey() public String getKey()
@ -239,12 +247,39 @@ public class ClientUpgradeRequest implements UpgradeRequest
return key; return key;
} }
@Override
public String getMethod()
{
// TODO Auto-generated method stub
return null;
}
@Override @Override
public String getOrigin() public String getOrigin()
{ {
return getHeader("Origin"); return getHeader("Origin");
} }
@Override
public Map<String, String[]> getParameterMap()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getQueryString()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getRemoteURI()
{
return httpEndPointName;
}
@Override @Override
public List<String> getSubProtocols() public List<String> getSubProtocols()
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.client.internal.io; package org.eclipse.jetty.websocket.client.internal.io;
import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -25,7 +26,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory; import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient; import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.masks.Masker; import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.io.AbstractWebSocketConnection; import org.eclipse.jetty.websocket.core.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/** /**
@ -47,6 +50,12 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
this.masker = client.getMasker(); this.masker = client.getMasker();
} }
@Override
public void configureFromExtensions(List<Extension> extensions)
{
/* do nothing */
}
public DefaultWebSocketClient getClient() public DefaultWebSocketClient getClient()
{ {
return client; return client;
@ -57,7 +66,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
{ {
super.onClose(); super.onClose();
factory.sessionClosed(getSession()); factory.sessionClosed(getSession());
}; }
@Override @Override
public void onOpen() public void onOpen()
@ -68,7 +77,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
connected = true; connected = true;
} }
super.onOpen(); super.onOpen();
} };
@Override @Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
@ -76,4 +85,10 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
masker.setMask(frame); masker.setMask(frame);
super.output(context,callback,frame); super.output(context,callback,frame);
} }
@Override
public void setIncoming(IncomingFrames incoming)
{
getParser().setIncomingFramesHandler(incoming);
}
} }

View File

@ -20,25 +20,12 @@ package org.eclipse.jetty.websocket.core.api;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo; import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.ConnectionState; import org.eclipse.jetty.websocket.core.protocol.ConnectionState;
/** public interface LogicalConnection extends OutgoingFrames
* Base Connection concepts
*/
public interface BaseConnection
{ {
/**
* Connection suspend token
*/
public static interface SuspendToken
{
/**
* Resume a previously suspended connection.
*/
void resume();
}
/** /**
* Send a websocket Close frame, without a status code or reason. * Send a websocket Close frame, without a status code or reason.
* <p> * <p>
@ -47,7 +34,7 @@ public interface BaseConnection
* @see StatusCode * @see StatusCode
* @see #close(int, String) * @see #close(int, String)
*/ */
void close(); public void close();
/** /**
* Send a websocket Close frame, with status code. * Send a websocket Close frame, with status code.
@ -60,7 +47,7 @@ public interface BaseConnection
* the (optional) reason. (can be null for no reason) * the (optional) reason. (can be null for no reason)
* @see StatusCode * @see StatusCode
*/ */
void close(int statusCode, String reason); public void close(int statusCode, String reason);
/** /**
* Terminate the connection (no close frame sent) * Terminate the connection (no close frame sent)

View File

@ -0,0 +1,30 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.api;
/**
* Connection suspend token
*/
public interface SuspendToken
{
/**
* Resume a previously suspended connection.
*/
void resume();
}

View File

@ -33,12 +33,22 @@ public interface UpgradeRequest
public String getHeader(String name); public String getHeader(String name);
public Map<String, List<String>> getHeaders();
public String getHost(); public String getHost();
public String getHttpEndPointName(); public String getHttpVersion();
public String getMethod();
public String getOrigin(); public String getOrigin();
public Map<String,String[]> getParameterMap();
public String getQueryString();
public String getRemoteURI();
public List<String> getSubProtocols(); public List<String> getSubProtocols();
public boolean hasSubProtocol(String test); public boolean hasSubProtocol(String test);

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.util.Callback;
/** /**
* Connection interface for WebSocket protocol <a href="https://tools.ietf.org/html/rfc6455">RFC-6455</a>. * Connection interface for WebSocket protocol <a href="https://tools.ietf.org/html/rfc6455">RFC-6455</a>.
*/ */
public interface WebSocketConnection extends BaseConnection public interface WebSocketConnection extends LogicalConnection
{ {
/** /**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection. * Access the (now read-only) {@link WebSocketPolicy} in use for this connection.

View File

@ -30,13 +30,14 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.BaseConnection.SuspendToken;
import org.eclipse.jetty.websocket.core.api.Extension; import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.StatusCode; import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection; import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException; import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.IncomingFrames; import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.InternalConnection;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames; import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession; import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo; import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
@ -46,7 +47,7 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/** /**
* MuxChannel, acts as WebSocketConnection for specific sub-channel. * MuxChannel, acts as WebSocketConnection for specific sub-channel.
*/ */
public class MuxChannel implements WebSocketConnection, IncomingFrames, OutgoingFrames, SuspendToken public class MuxChannel implements WebSocketConnection, InternalConnection, IncomingFrames, OutgoingFrames, SuspendToken
{ {
private static final Logger LOG = Log.getLogger(MuxChannel.class); private static final Logger LOG = Log.getLogger(MuxChannel.class);
@ -95,6 +96,12 @@ public class MuxChannel implements WebSocketConnection, IncomingFrames, Outgoing
} }
} }
@Override
public void configureFromExtensions(List<Extension> extensions)
{
/* ignore */
}
@Override @Override
public void disconnect() public void disconnect()
{ {
@ -251,10 +258,16 @@ public class MuxChannel implements WebSocketConnection, IncomingFrames, Outgoing
} }
} }
@Override
public void setIncoming(IncomingFrames incoming)
{
this.incoming = incoming;
}
@Override
public void setSession(WebSocketSession session) public void setSession(WebSocketSession session)
{ {
this.session = session; this.session = session;
this.incoming = session;
session.setOutgoing(this); session.setOutgoing(this);
} }

View File

@ -81,7 +81,7 @@ public class MuxGenerator
MuxAddChannelRequest op = (MuxAddChannelRequest)block; MuxAddChannelRequest op = (MuxAddChannelRequest)block;
byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
b |= (byte)((op.getRsv() & 0x07) << 2); // rsv b |= (byte)((op.getRsv() & 0x07) << 2); // rsv
b |= (op.getEnc() & 0x03); // enc b |= (op.getEncoding() & 0x03); // enc
payload.put(b); // opcode + rsv + enc payload.put(b); // opcode + rsv + enc
writeChannelId(payload,op.getChannelId()); writeChannelId(payload,op.getChannelId());
write139Buffer(payload,op.getHandshake()); write139Buffer(payload,op.getHandshake());
@ -93,7 +93,7 @@ public class MuxGenerator
byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
b |= (op.isFailed()?0x10:0x00); // failure bit b |= (op.isFailed()?0x10:0x00); // failure bit
b |= (byte)((op.getRsv() & 0x03) << 2); // rsv b |= (byte)((op.getRsv() & 0x03) << 2); // rsv
b |= (op.getEnc() & 0x03); // enc b |= (op.getEncoding() & 0x03); // enc
payload.put(b); // opcode + f + rsv + enc payload.put(b); // opcode + f + rsv + enc
writeChannelId(payload,op.getChannelId()); writeChannelId(payload,op.getChannelId());
if (op.getHandshake() != null) if (op.getHandshake() != null)

View File

@ -147,7 +147,7 @@ public class MuxParser
{ {
MuxAddChannelRequest op = new MuxAddChannelRequest(); MuxAddChannelRequest op = new MuxAddChannelRequest();
op.setRsv((byte)((b & 0x1C) >> 2)); op.setRsv((byte)((b & 0x1C) >> 2));
op.setEnc((byte)(b & 0x03)); op.setEncoding((byte)(b & 0x03));
op.setChannelId(readChannelId(buffer)); op.setChannelId(readChannelId(buffer));
long handshakeSize = read139EncodedSize(buffer); long handshakeSize = read139EncodedSize(buffer);
op.setHandshake(readBlock(buffer,handshakeSize)); op.setHandshake(readBlock(buffer,handshakeSize));
@ -159,7 +159,7 @@ public class MuxParser
MuxAddChannelResponse op = new MuxAddChannelResponse(); MuxAddChannelResponse op = new MuxAddChannelResponse();
op.setFailed((b & 0x10) != 0); op.setFailed((b & 0x10) != 0);
op.setRsv((byte)((byte)(b & 0x0C) >> 2)); op.setRsv((byte)((byte)(b & 0x0C) >> 2));
op.setEnc((byte)(b & 0x03)); op.setEncoding((byte)(b & 0x03));
op.setChannelId(readChannelId(buffer)); op.setChannelId(readChannelId(buffer));
long handshakeSize = read139EncodedSize(buffer); long handshakeSize = read139EncodedSize(buffer);
op.setHandshake(readBlock(buffer,handshakeSize)); op.setHandshake(readBlock(buffer,handshakeSize));

View File

@ -0,0 +1,217 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
public class MuxRequest implements UpgradeRequest
{
public static final String HEADER_VALUE_DELIM="\"\\\n\r\t\f\b%+ ;=";
public static UpgradeRequest merge(UpgradeRequest baseReq, UpgradeRequest deltaReq)
{
MuxRequest req = new MuxRequest(baseReq);
req.method = overlay(deltaReq.getMethod(),req.getMethod());
// TODO: finish
return req;
}
private static String overlay(String val, String defVal)
{
if (val == null)
{
return defVal;
}
return val;
}
public static UpgradeRequest parse(ByteBuffer handshake)
{
MuxRequest req = new MuxRequest();
// TODO Auto-generated method stub
return req;
}
private String method;
private String httpVersion;
private String remoteURI;
private String queryString;
private List<String> subProtocols;
private Map<String, String> cookies;
private List<ExtensionConfig> extensions;
private Map<String, List<String>> headers;
private Map<String, String[]> parameterMap;
public MuxRequest()
{
// TODO Auto-generated constructor stub
}
public MuxRequest(UpgradeRequest copy)
{
// TODO Auto-generated constructor stub
}
@Override
public void addExtensions(String... extConfigs)
{
for (String extConfig : extConfigs)
{
extensions.add(ExtensionConfig.parse(extConfig));
}
}
@Override
public Map<String, String> getCookieMap()
{
return cookies;
}
@Override
public List<ExtensionConfig> getExtensions()
{
return extensions;
}
@Override
public String getHeader(String name)
{
List<String> values = headers.get(name);
// not set
if ((values == null) || (values.isEmpty()))
{
return null;
}
// only 1 value (most common scenario)
if (values.size() == 1)
{
return values.get(0);
}
// merge multiple values together
StringBuilder ret = new StringBuilder();
boolean delim = false;
for (String value : values)
{
if (delim)
{
ret.append(", ");
}
QuotedStringTokenizer.quoteIfNeeded(ret,value,HEADER_VALUE_DELIM);
delim = true;
}
return ret.toString();
}
@Override
public Map<String, List<String>> getHeaders()
{
return headers;
}
@Override
public String getHost()
{
return getHeader("Host");
}
@Override
public String getHttpVersion()
{
return httpVersion;
}
@Override
public String getMethod()
{
return method;
}
@Override
public String getOrigin()
{
return getHeader("Origin");
}
@Override
public Map<String, String[]> getParameterMap()
{
return parameterMap;
}
@Override
public String getQueryString()
{
return queryString;
}
@Override
public String getRemoteURI()
{
return remoteURI;
}
@Override
public List<String> getSubProtocols()
{
return subProtocols;
}
@Override
public boolean hasSubProtocol(String test)
{
for (String protocol : subProtocols)
{
if (protocol.equalsIgnoreCase(test))
{
return true;
}
}
return false;
}
@Override
public boolean isOrigin(String test)
{
return test.equalsIgnoreCase(getOrigin());
}
@Override
public void setSubProtocols(String protocols)
{
this.subProtocols.clear();
if (StringUtil.isBlank(protocols))
{
return;
}
for (String protocol : protocols.split("\\s*,\\s*"))
{
this.subProtocols.add(protocol);
}
}
}

View File

@ -0,0 +1,129 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.websocket.core.api.UpgradeException;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.protocol.ExtensionConfig;
public class MuxResponse implements UpgradeResponse
{
@Override
public void addHeader(String name, String value)
{
// TODO Auto-generated method stub
}
@Override
public String getAcceptedSubProtocol()
{
// TODO Auto-generated method stub
return null;
}
@Override
public List<ExtensionConfig> getExtensions()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Set<String> getHeaderNamesSet()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHeaderValue(String name)
{
// TODO Auto-generated method stub
return null;
}
@Override
public Iterator<String> getHeaderValues(String name)
{
// TODO Auto-generated method stub
return null;
}
@Override
public int getStatusCode()
{
// TODO Auto-generated method stub
return 0;
}
@Override
public String getStatusReason()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isSuccess()
{
// TODO Auto-generated method stub
return false;
}
@Override
public void sendForbidden(String message) throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void setAcceptedSubProtocol(String protocol)
{
// TODO Auto-generated method stub
}
@Override
public void setExtensions(List<ExtensionConfig> extensions)
{
// TODO Auto-generated method stub
}
@Override
public void setHeader(String name, String value)
{
// TODO Auto-generated method stub
}
@Override
public void validateWebSocketHash(String expectedHash) throws UpgradeException
{
// TODO Auto-generated method stub
}
}

View File

@ -24,12 +24,12 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.StatusCode; import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.WebSocketBehavior; import org.eclipse.jetty.websocket.core.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection; import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException; import org.eclipse.jetty.websocket.core.api.WebSocketException;
@ -98,6 +98,29 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
return addServer; return addServer;
} }
public MuxChannel getChannel(long channelId, boolean create)
{
if (channelId == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
MuxChannel channel = channels.get(channelId);
if (channel == null)
{
if (create)
{
channel = new MuxChannel(channelId,this);
channels.put(channelId,channel);
}
else
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
}
return channel;
}
public WebSocketPolicy getPolicy() public WebSocketPolicy getPolicy()
{ {
return policy; return policy;
@ -193,39 +216,35 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set"); throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set");
} }
if (request.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
// Pre-allocate channel. // Pre-allocate channel.
long channelId = request.getChannelId(); long channelId = request.getChannelId();
MuxChannel channel = new MuxChannel(channelId,this); MuxChannel channel = getChannel(channelId, true);
this.channels.put(channelId,channel);
// submit to upgrade handshake process // submit to upgrade handshake process
try try
{ {
String requestHandshake = BufferUtil.toUTF8String(request.getHandshake()); switch (request.getEncoding())
if (request.isDeltaEncoded())
{ {
// Merge original request headers out of physical connection. case MuxAddChannelRequest.IDENTITY_ENCODING:
requestHandshake = mergeHeaders(physicalRequestHeaders,requestHandshake); {
} UpgradeRequest idenReq = MuxRequest.parse(request.getHandshake());
String responseHandshake = addServer.handshake(channel,requestHandshake); addServer.handshake(this,channel,idenReq);
if (StringUtil.isNotBlank(responseHandshake)) break;
{ }
// Upgrade Success case MuxAddChannelRequest.DELTA_ENCODING:
MuxAddChannelResponse response = new MuxAddChannelResponse(); {
response.setChannelId(request.getChannelId()); UpgradeRequest baseReq = addServer.getPhysicalHandshakeRequest();
response.setFailed(false); UpgradeRequest deltaReq = MuxRequest.parse(request.getHandshake());
response.setHandshake(responseHandshake); UpgradeRequest mergedReq = MuxRequest.merge(baseReq,deltaReq);
// send response
this.generator.generate(response); addServer.handshake(this,channel,mergedReq);
} break;
else }
{ default:
// TODO: trigger error? {
// TODO: ERROR
break;
}
} }
} }
catch (Throwable t) catch (Throwable t)
@ -250,18 +269,9 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set"); throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set");
} }
if (response.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
// Process channel // Process channel
long channelId = response.getChannelId(); long channelId = response.getChannelId();
MuxChannel channel = this.channels.get(channelId); MuxChannel channel = getChannel(channelId,false);
if (channel == null)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
// Process Response headers // Process Response headers
try try
@ -288,18 +298,9 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
@Override @Override
public void onMuxDropChannel(MuxDropChannel drop) public void onMuxDropChannel(MuxDropChannel drop)
{ {
if (drop.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
// Process channel // Process channel
long channelId = drop.getChannelId(); long channelId = drop.getChannelId();
MuxChannel channel = this.channels.get(channelId); MuxChannel channel = getChannel(channelId,false);
if (channel == null)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
String reason = "Mux " + drop.toString(); String reason = "Mux " + drop.toString();
reason = StringUtil.truncate(reason,(WebSocketFrame.MAX_CONTROL_PAYLOAD - 2)); reason = StringUtil.truncate(reason,(WebSocketFrame.MAX_CONTROL_PAYLOAD - 2));
@ -335,11 +336,6 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
@Override @Override
public void onMuxFlowControl(MuxFlowControl flow) public void onMuxFlowControl(MuxFlowControl flow)
{ {
if (flow.getChannelId() == CONTROL_CHANNEL_ID)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
}
if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL) if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL)
{ {
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow"); throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow");
@ -347,11 +343,7 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
// Process channel // Process channel
long channelId = flow.getChannelId(); long channelId = flow.getChannelId();
MuxChannel channel = this.channels.get(channelId); MuxChannel channel = getChannel(channelId,false);
if (channel == null)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
}
// TODO: set channel quota // TODO: set channel quota
} }
@ -394,6 +386,18 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
generator.output(context,callback,channelId,frame); generator.output(context,callback,channelId,frame);
} }
/**
* Write an OP out the physical connection.
*
* @param op
* the mux operation to write
* @throws IOException
*/
public void output(MuxControlBlock op) throws IOException
{
generator.generate(op);
}
public void setAddClient(MuxAddClient addClient) public void setAddClient(MuxAddClient addClient)
{ {
this.addClient = addClient; this.addClient = addClient;
@ -420,6 +424,6 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
@Override @Override
public String toString() public String toString()
{ {
return String.format("Muxer[subChannels.size=%d]", channels.size()); return String.format("Muxer[subChannels.size=%d]",channels.size());
} }
} }

View File

@ -20,8 +20,11 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add;
import java.io.IOException; import java.io.IOException;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel; import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException; import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.io.WebSocketSession; import org.eclipse.jetty.websocket.core.io.WebSocketSession;
/** /**
@ -29,6 +32,10 @@ import org.eclipse.jetty.websocket.core.io.WebSocketSession;
*/ */
public interface MuxAddServer public interface MuxAddServer
{ {
public UpgradeRequest getPhysicalHandshakeRequest();
public UpgradeResponse getPhysicalHandshakeResponse();
/** /**
* Perform the handshake. * Perform the handshake.
* *
@ -36,11 +43,10 @@ public interface MuxAddServer
* the channel to attach the {@link WebSocketSession} to. * the channel to attach the {@link WebSocketSession} to.
* @param requestHandshake * @param requestHandshake
* the request handshake (request headers) * the request handshake (request headers)
* @return the response handshake (the response headers)
* @throws AbstractMuxException * @throws AbstractMuxException
* if unable to handshake * if unable to handshake
* @throws IOException * @throws IOException
* if unable to parse request headers * if unable to parse request headers
*/ */
String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException; void handshake(Muxer muxer, MuxChannel channel, UpgradeRequest request) throws MuxException, IOException;
} }

View File

@ -20,13 +20,18 @@ package org.eclipse.jetty.websocket.core.extensions.mux.op;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock; import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp; import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
public class MuxAddChannelRequest implements MuxControlBlock public class MuxAddChannelRequest implements MuxControlBlock
{ {
public static final byte IDENTITY_ENCODING = (byte)0x00;
public static final byte DELTA_ENCODING = (byte)0x01;
private long channelId = -1; private long channelId = -1;
private byte enc; private byte encoding;
private ByteBuffer handshake; private ByteBuffer handshake;
private byte rsv; private byte rsv;
@ -35,9 +40,9 @@ public class MuxAddChannelRequest implements MuxControlBlock
return channelId; return channelId;
} }
public byte getEnc() public byte getEncoding()
{ {
return enc; return encoding;
} }
public ByteBuffer getHandshake() public ByteBuffer getHandshake()
@ -67,12 +72,12 @@ public class MuxAddChannelRequest implements MuxControlBlock
public boolean isDeltaEncoded() public boolean isDeltaEncoded()
{ {
return (enc == 1); return (encoding == DELTA_ENCODING);
} }
public boolean isIdentityEncoded() public boolean isIdentityEncoded()
{ {
return (enc == 0); return (encoding == IDENTITY_ENCODING);
} }
public void setChannelId(long channelId) public void setChannelId(long channelId)
@ -80,9 +85,9 @@ public class MuxAddChannelRequest implements MuxControlBlock
this.channelId = channelId; this.channelId = channelId;
} }
public void setEnc(byte enc) public void setEncoding(byte enc)
{ {
this.enc = enc; this.encoding = enc;
} }
public void setHandshake(ByteBuffer handshake) public void setHandshake(ByteBuffer handshake)
@ -97,6 +102,11 @@ public class MuxAddChannelRequest implements MuxControlBlock
} }
} }
public void setHandshake(String rawstring)
{
setHandshake(BufferUtil.toBuffer(rawstring,StringUtil.__UTF8_CHARSET));
}
public void setRsv(byte rsv) public void setRsv(byte rsv)
{ {
this.rsv = rsv; this.rsv = rsv;

View File

@ -21,13 +21,17 @@ package org.eclipse.jetty.websocket.core.extensions.mux.op;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock; import org.eclipse.jetty.websocket.core.extensions.mux.MuxControlBlock;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp; import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
public class MuxAddChannelResponse implements MuxControlBlock public class MuxAddChannelResponse implements MuxControlBlock
{ {
public static final byte IDENTITY_ENCODING = (byte)0x00;
public static final byte DELTA_ENCODING = (byte)0x01;
private long channelId; private long channelId;
private byte enc; private byte encoding;
private byte rsv; private byte rsv;
private boolean failed = false; private boolean failed = false;
private ByteBuffer handshake; private ByteBuffer handshake;
@ -37,9 +41,9 @@ public class MuxAddChannelResponse implements MuxControlBlock
return channelId; return channelId;
} }
public byte getEnc() public byte getEncoding()
{ {
return enc; return encoding;
} }
public ByteBuffer getHandshake() public ByteBuffer getHandshake()
@ -67,19 +71,29 @@ public class MuxAddChannelResponse implements MuxControlBlock
return rsv; return rsv;
} }
public boolean isDeltaEncoded()
{
return (encoding == DELTA_ENCODING);
}
public boolean isFailed() public boolean isFailed()
{ {
return failed; return failed;
} }
public boolean isIdentityEncoded()
{
return (encoding == IDENTITY_ENCODING);
}
public void setChannelId(long channelId) public void setChannelId(long channelId)
{ {
this.channelId = channelId; this.channelId = channelId;
} }
public void setEnc(byte enc) public void setEncoding(byte enc)
{ {
this.enc = enc; this.encoding = enc;
} }
public void setFailed(boolean failed) public void setFailed(boolean failed)
@ -101,7 +115,7 @@ public class MuxAddChannelResponse implements MuxControlBlock
public void setHandshake(String responseHandshake) public void setHandshake(String responseHandshake)
{ {
setHandshake(BufferUtil.toBuffer(responseHandshake)); setHandshake(BufferUtil.toBuffer(responseHandshake,StringUtil.__UTF8_CHARSET));
} }
public void setRsv(byte rsv) public void setRsv(byte rsv)

View File

@ -37,9 +37,9 @@ import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.api.BaseConnection;
import org.eclipse.jetty.websocket.core.api.CloseException; import org.eclipse.jetty.websocket.core.api.CloseException;
import org.eclipse.jetty.websocket.core.api.StatusCode; import org.eclipse.jetty.websocket.core.api.StatusCode;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection; import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo; import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
@ -53,7 +53,7 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/** /**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io * Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
*/ */
public abstract class AbstractWebSocketConnection extends AbstractConnection implements BaseConnection, BaseConnection.SuspendToken, OutgoingFrames public abstract class AbstractWebSocketConnection extends AbstractConnection implements InternalConnection, SuspendToken
{ {
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class); private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames"); private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io;
import java.io.IOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/**
* Utility class to pipe {@link IncomingFrames} and {@link OutgoingFrames} around
*/
public class FramePipes
{
private static class In2Out implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(In2Out.class);
private OutgoingFrames outgoing;
public In2Out(OutgoingFrames outgoing)
{
this.outgoing = outgoing;
}
@Override
public void incoming(WebSocketException e)
{
/* cannot send exception on */
}
@Override
public void incoming(WebSocketFrame frame)
{
try
{
this.outgoing.output(null,new FutureCallback<>(),frame);
}
catch (IOException e)
{
LOG.debug(e);
}
}
}
private static class Out2In implements OutgoingFrames
{
private IncomingFrames incoming;
public Out2In(IncomingFrames incoming)
{
this.incoming = incoming;
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
this.incoming.incoming(frame);
}
}
public static OutgoingFrames to(final IncomingFrames incoming)
{
return new Out2In(incoming);
}
public static IncomingFrames to(final OutgoingFrames outgoing)
{
return new In2Out(outgoing);
}
}

View File

@ -0,0 +1,33 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.io;
import java.util.List;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.LogicalConnection;
public interface InternalConnection extends LogicalConnection
{
void configureFromExtensions(List<Extension> extensions);
void setIncoming(IncomingFrames incoming);
void setSession(WebSocketSession session);
}

View File

@ -26,7 +26,8 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.BaseConnection; import org.eclipse.jetty.websocket.core.api.LogicalConnection;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection; import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException; import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
@ -41,17 +42,17 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
private static final Logger LOG = Log.getLogger(WebSocketSession.class); private static final Logger LOG = Log.getLogger(WebSocketSession.class);
/** /**
* The reference to the base connection. * The reference to the logical connection.
* <p> * <p>
* This will be the {@link AbstractWebSocketConnection} on normal websocket use, and be a MuxConnection when MUX is in the picture. * This will be the {@link AbstractWebSocketConnection} on normal websocket use, and be a MuxChannel when MUX is in the picture.
*/ */
private final BaseConnection baseConnection; private final LogicalConnection baseConnection;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final String subprotocol; private final String subprotocol;
private final EventDriver websocket; private final EventDriver websocket;
private OutgoingFrames outgoing; private OutgoingFrames outgoing;
public WebSocketSession(EventDriver websocket, BaseConnection connection, WebSocketPolicy policy, String subprotocol) public WebSocketSession(EventDriver websocket, LogicalConnection connection, WebSocketPolicy policy, String subprotocol)
{ {
super(); super();
this.websocket = websocket; this.websocket = websocket;

View File

@ -27,13 +27,13 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/** /**
* Helpful utility class to parse arbitrary mux events from a physical connection's OutgoingFrames. * Helpful utility class to parse arbitrary mux events from a physical connection's OutgoingFrames.
* *
* @see MuxInjector * @see MuxEncoder
*/ */
public class MuxReducer extends MuxEventCapture implements OutgoingFrames public class MuxDecoder extends MuxEventCapture implements OutgoingFrames
{ {
private MuxParser parser; private MuxParser parser;
public MuxReducer() public MuxDecoder()
{ {
parser = new MuxParser(); parser = new MuxParser();
parser.setEvents(this); parser.setEvents(this);

View File

@ -20,9 +20,7 @@ package org.eclipse.jetty.websocket.core.extensions.mux;
import java.io.IOException; import java.io.IOException;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.io.FramePipes;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.io.IncomingFrames; import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames; import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
@ -30,19 +28,26 @@ import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
/** /**
* Helpful utility class to send arbitrary mux events into a physical connection's IncomingFrames. * Helpful utility class to send arbitrary mux events into a physical connection's IncomingFrames.
* *
* @see MuxReducer * @see MuxDecoder
*/ */
public class MuxInjector implements OutgoingFrames public class MuxEncoder
{ {
private static final Logger LOG = Log.getLogger(MuxInjector.class); public static MuxEncoder toIncoming(IncomingFrames incoming)
private IncomingFrames incoming; {
return new MuxEncoder(FramePipes.to(incoming));
}
public static MuxEncoder toOutgoing(OutgoingFrames outgoing)
{
return new MuxEncoder(outgoing);
}
private MuxGenerator generator; private MuxGenerator generator;
public MuxInjector(IncomingFrames incoming) private MuxEncoder(OutgoingFrames outgoing)
{ {
this.incoming = incoming;
this.generator = new MuxGenerator(); this.generator = new MuxGenerator();
this.generator.setOutgoing(this); this.generator.setOutgoing(outgoing);
} }
public void frame(long channelId, WebSocketFrame frame) throws IOException public void frame(long channelId, WebSocketFrame frame) throws IOException
@ -54,11 +59,4 @@ public class MuxInjector implements OutgoingFrames
{ {
this.generator.generate(op); this.generator.generate(op);
} }
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
LOG.debug("Injecting {} to {}",frame,incoming);
this.incoming.incoming(frame);
}
} }

View File

@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux.add;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.WebSocketSession;
public class DummyMuxAddClient implements MuxAddClient
{
@Override
public WebSocketSession createSession(MuxAddChannelResponse response)
{
// TODO Auto-generated method stub
return null;
}
}

View File

@ -22,10 +22,14 @@ import java.io.IOException;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.examples.echo.AdapterEchoSocket; import org.eclipse.jetty.websocket.core.examples.echo.AdapterEchoSocket;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel; import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException; import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.WebSocketSession; import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver; import org.eclipse.jetty.websocket.core.io.event.EventDriver;
import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory; import org.eclipse.jetty.websocket.core.io.event.EventDriverFactory;
@ -49,7 +53,21 @@ public class DummyMuxAddServer implements MuxAddServer
} }
@Override @Override
public String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException public UpgradeRequest getPhysicalHandshakeRequest()
{
// TODO Auto-generated method stub
return null;
}
@Override
public UpgradeResponse getPhysicalHandshakeResponse()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void handshake(Muxer muxer, MuxChannel channel, UpgradeRequest request) throws MuxException, IOException
{ {
StringBuilder response = new StringBuilder(); StringBuilder response = new StringBuilder();
response.append("HTTP/1.1 101 Switching Protocols\r\n"); response.append("HTTP/1.1 101 Switching Protocols\r\n");
@ -65,6 +83,12 @@ public class DummyMuxAddServer implements MuxAddServer
channel.onOpen(); channel.onOpen();
session.onConnect(); session.onConnect();
return response.toString(); MuxAddChannelResponse addChannelResponse = new MuxAddChannelResponse();
addChannelResponse.setChannelId(channel.getChannelId());
addChannelResponse.setEncoding(MuxAddChannelResponse.IDENTITY_ENCODING);
addChannelResponse.setFailed(false);
addChannelResponse.setHandshake(response.toString());
muxer.output(addChannelResponse);
} }
} }

View File

@ -0,0 +1,104 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions.mux.add;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxDecoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxEncoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
import org.eclipse.jetty.websocket.core.io.LocalWebSocketConnection;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class MuxerAddClientTest
{
@Rule
public TestName testname = new TestName();
@Test
@Ignore("Interrim, not functional yet")
public void testAddChannel_Client() throws Exception
{
// Client side physical socket
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newClientPolicy());
physical.onOpen();
// Server Reader
MuxDecoder serverRead = new MuxDecoder();
// Client side Muxer
Muxer muxer = new Muxer(physical,serverRead);
DummyMuxAddClient addClient = new DummyMuxAddClient();
muxer.setAddClient(addClient);
// Server Writer
MuxEncoder serverWrite = MuxEncoder.toIncoming(physical);
// Build AddChannelRequest handshake data
StringBuilder request = new StringBuilder();
request.append("GET /echo HTTP/1.1\r\n");
request.append("Host: localhost\r\n");
request.append("Upgrade: websocket\r\n");
request.append("Connection: Upgrade\r\n");
request.append("Sec-WebSocket-Key: ZDTIRU5vU9xOfkg8JAgN3A==\r\n");
request.append("Sec-WebSocket-Version: 13\r\n");
request.append("\r\n");
// Build AddChannelRequest
long channelId = 1L;
MuxAddChannelRequest req = new MuxAddChannelRequest();
req.setChannelId(channelId);
req.setEncoding((byte)0);
req.setHandshake(request.toString());
// Have client sent AddChannelRequest
MuxChannel channel = muxer.getChannel(channelId,true);
MuxEncoder clientWrite = MuxEncoder.toOutgoing(channel);
clientWrite.op(req);
// Have server read request
serverRead.assertHasOp(MuxOp.ADD_CHANNEL_REQUEST,1);
// prepare AddChannelResponse
StringBuilder response = new StringBuilder();
response.append("HTTP/1.1 101 Switching Protocols\r\n");
response.append("Upgrade: websocket\r\n");
response.append("Connection: upgrade\r\n");
response.append("Sec-WebSocket-Accept: Kgo85/8KVE8YPONSeyhgL3GwqhI=\r\n");
response.append("\r\n");
MuxAddChannelResponse resp = new MuxAddChannelResponse();
resp.setChannelId(channelId);
resp.setFailed(false);
resp.setEncoding((byte)0);
resp.setHandshake(resp.toString());
// Server writes add channel response
serverWrite.op(resp);
// TODO: handle the upgrade on client side.
}
}

View File

@ -20,11 +20,10 @@ package org.eclipse.jetty.websocket.core.extensions.mux.add;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxInjector; import org.eclipse.jetty.websocket.core.extensions.mux.MuxDecoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxEncoder;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp; import org.eclipse.jetty.websocket.core.extensions.mux.MuxOp;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxReducer;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer; import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse; import org.eclipse.jetty.websocket.core.extensions.mux.op.MuxAddChannelResponse;
@ -32,6 +31,7 @@ import org.eclipse.jetty.websocket.core.io.LocalWebSocketConnection;
import org.eclipse.jetty.websocket.core.protocol.OpCode; import org.eclipse.jetty.websocket.core.protocol.OpCode;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;
@ -42,22 +42,31 @@ public class MuxerAddServerTest
public TestName testname = new TestName(); public TestName testname = new TestName();
@Test @Test
@Ignore("Interrim, not functional yet")
public void testAddChannel_Server() throws Exception public void testAddChannel_Server() throws Exception
{ {
// Server side physical connection
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname); LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newServerPolicy()); physical.setPolicy(WebSocketPolicy.newServerPolicy());
physical.onOpen(); physical.onOpen();
MuxReducer reducer = new MuxReducer(); // Client reader
MuxDecoder clientRead = new MuxDecoder();
// Represents a server side muxer. // Build up server side muxer.
Muxer muxer = new Muxer(physical,reducer); Muxer muxer = new Muxer(physical,clientRead);
DummyMuxAddServer addServer = new DummyMuxAddServer(); DummyMuxAddServer addServer = new DummyMuxAddServer();
muxer.setAddServer(addServer); muxer.setAddServer(addServer);
MuxInjector inject = new MuxInjector(muxer); // Wire up physical connection to forward incoming frames to muxer
physical.setIncoming(muxer);
// Trigger AddChannel // Client simulator
// Can inject mux encapsulated frames into physical connection as if from
// physical connection.
MuxEncoder clientWrite = MuxEncoder.toIncoming(physical);
// Build AddChannelRequest handshake data
StringBuilder request = new StringBuilder(); StringBuilder request = new StringBuilder();
request.append("GET /echo HTTP/1.1\r\n"); request.append("GET /echo HTTP/1.1\r\n");
request.append("Host: localhost\r\n"); request.append("Host: localhost\r\n");
@ -67,27 +76,29 @@ public class MuxerAddServerTest
request.append("Sec-WebSocket-Version: 13\r\n"); request.append("Sec-WebSocket-Version: 13\r\n");
request.append("\r\n"); request.append("\r\n");
// Build AddChannelRequest
MuxAddChannelRequest req = new MuxAddChannelRequest(); MuxAddChannelRequest req = new MuxAddChannelRequest();
req.setChannelId(1); req.setChannelId(1);
req.setEnc((byte)0); req.setEncoding((byte)0);
req.setHandshake(BufferUtil.toBuffer(request.toString())); req.setHandshake(request.toString());
inject.op(req); // Have client sent AddChannelRequest
clientWrite.op(req);
// Make sure we got AddChannelResponse // Make sure client got AddChannelResponse
reducer.assertHasOp(MuxOp.ADD_CHANNEL_RESPONSE,1); clientRead.assertHasOp(MuxOp.ADD_CHANNEL_RESPONSE,1);
MuxAddChannelResponse response = (MuxAddChannelResponse)reducer.getOps().pop(); MuxAddChannelResponse response = (MuxAddChannelResponse)clientRead.getOps().pop();
Assert.assertThat("AddChannelResponse.channelId",response.getChannelId(),is(1L)); Assert.assertThat("AddChannelResponse.channelId",response.getChannelId(),is(1L));
Assert.assertThat("AddChannelResponse.failed",response.isFailed(),is(false)); Assert.assertThat("AddChannelResponse.failed",response.isFailed(),is(false));
Assert.assertThat("AddChannelResponse.handshake",response.getHandshake(),notNullValue()); Assert.assertThat("AddChannelResponse.handshake",response.getHandshake(),notNullValue());
Assert.assertThat("AddChannelResponse.handshakeSize",response.getHandshakeSize(),is(57L)); Assert.assertThat("AddChannelResponse.handshakeSize",response.getHandshakeSize(),is(57L));
reducer.reset(); clientRead.reset();
// Send simple echo request // Send simple echo request
inject.frame(1,WebSocketFrame.text("Hello World")); clientWrite.frame(1,WebSocketFrame.text("Hello World"));
// Test for echo response (is there a user echo websocket connected to the sub-channel?) // Test for echo response (is there a user echo websocket connected to the sub-channel?)
reducer.assertHasFrame(OpCode.TEXT,1L,1); clientRead.assertHasFrame(OpCode.TEXT,1L,1);
} }
} }

View File

@ -23,17 +23,21 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.api.SuspendToken;
import org.eclipse.jetty.websocket.core.api.WebSocketConnection; import org.eclipse.jetty.websocket.core.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.protocol.CloseInfo; import org.eclipse.jetty.websocket.core.protocol.CloseInfo;
import org.eclipse.jetty.websocket.core.protocol.ConnectionState; import org.eclipse.jetty.websocket.core.protocol.ConnectionState;
import org.eclipse.jetty.websocket.core.protocol.WebSocketFrame;
import org.junit.rules.TestName; import org.junit.rules.TestName;
public class LocalWebSocketConnection implements WebSocketConnection public class LocalWebSocketConnection implements WebSocketConnection, IncomingFrames
{ {
private final String id; private final String id;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy(); private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
private boolean open = false; private boolean open = false;
private IncomingFrames incoming;
public LocalWebSocketConnection() public LocalWebSocketConnection()
{ {
@ -68,6 +72,11 @@ public class LocalWebSocketConnection implements WebSocketConnection
open = false; open = false;
} }
public IncomingFrames getIncoming()
{
return incoming;
}
@Override @Override
public WebSocketPolicy getPolicy() public WebSocketPolicy getPolicy()
{ {
@ -92,10 +101,21 @@ public class LocalWebSocketConnection implements WebSocketConnection
return null; return null;
} }
@Override
public void incoming(WebSocketException e)
{
incoming.incoming(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
incoming.incoming(frame);
}
@Override @Override
public boolean isInputClosed() public boolean isInputClosed()
{ {
// TODO Auto-generated method stub
return false; return false;
} }
@ -108,7 +128,6 @@ public class LocalWebSocketConnection implements WebSocketConnection
@Override @Override
public boolean isOutputClosed() public boolean isOutputClosed()
{ {
// TODO Auto-generated method stub
return false; return false;
} }
@ -121,18 +140,27 @@ public class LocalWebSocketConnection implements WebSocketConnection
@Override @Override
public void onCloseHandshake(boolean incoming, CloseInfo close) public void onCloseHandshake(boolean incoming, CloseInfo close)
{ {
// TODO Auto-generated method stub
} }
public void onOpen() { public void onOpen() {
open = true; open = true;
} }
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
}
@Override @Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{ {
} }
public void setIncoming(IncomingFrames incoming)
{
this.incoming = incoming;
}
public void setPolicy(WebSocketPolicy policy) public void setPolicy(WebSocketPolicy policy)
{ {
this.policy = policy; this.policy = policy;

View File

@ -91,29 +91,43 @@ public class ServletWebSocketRequest extends HttpServletRequestWrapper implement
return extensions; return extensions;
} }
@Override
public Map<String, List<String>> getHeaders()
{
// TODO Auto-generated method stub
return null;
}
@Override @Override
public String getHost() public String getHost()
{ {
return getHeader("Host"); return getHeader("Host");
} }
@Override
public String getHttpVersion()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getOrigin()
{
return getHeader("Origin");
}
/** /**
* Get the endpoint of the WebSocket connection. * Get the endpoint of the WebSocket connection.
* <p> * <p>
* Per the <a href="https://tools.ietf.org/html/rfc6455#section-1.3">Opening Handshake (RFC 6455)</a> * Per the <a href="https://tools.ietf.org/html/rfc6455#section-1.3">Opening Handshake (RFC 6455)</a>
*/ */
@Override @Override
public String getHttpEndPointName() public String getRemoteURI()
{ {
return getRequestURI(); return getRequestURI();
} }
@Override
public String getOrigin()
{
return getHeader("Origin");
}
@Override @Override
public List<String> getSubProtocols() public List<String> getSubProtocols()
{ {

View File

@ -0,0 +1,60 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.io.InternalConnection;
public class UpgradeContext
{
private InternalConnection connection;
private UpgradeRequest request;
private UpgradeResponse response;
public InternalConnection getConnection()
{
return connection;
}
public UpgradeRequest getRequest()
{
return request;
}
public UpgradeResponse getResponse()
{
return response;
}
public void setConnection(InternalConnection connection)
{
this.connection = connection;
}
public void setRequest(UpgradeRequest request)
{
this.request = request;
}
public void setResponse(UpgradeResponse response)
{
this.response = response;
}
}

View File

@ -18,27 +18,37 @@
package org.eclipse.jetty.websocket.server; package org.eclipse.jetty.websocket.server;
import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.api.Extension;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.io.AbstractWebSocketConnection; import org.eclipse.jetty.websocket.core.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.core.io.IncomingFrames;
public class WebSocketServerConnection extends AbstractWebSocketConnection public class WebSocketServerConnection extends AbstractWebSocketConnection
{ {
private final WebSocketServerFactory factory; private final WebSocketServerFactory factory;
private boolean connected; private boolean connected;
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool,
ByteBufferPool bufferPool, WebSocketServerFactory factory) WebSocketServerFactory factory)
{ {
super(endp,executor,scheduler,policy,bufferPool); super(endp,executor,scheduler,policy,bufferPool);
this.factory = factory; this.factory = factory;
this.connected = false; this.connected = false;
} }
@Override
public void configureFromExtensions(List<Extension> extensions)
{
getParser().configureFromExtensions(extensions);
getGenerator().configureFromExtensions(extensions);
}
@Override @Override
public void onClose() public void onClose()
{ {
@ -56,4 +66,10 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
} }
super.onOpen(); super.onOpen();
} }
@Override
public void setIncoming(IncomingFrames incoming)
{
getParser().setIncomingFramesHandler(incoming);
}
} }

View File

@ -51,6 +51,7 @@ import org.eclipse.jetty.websocket.core.api.WebSocketException;
import org.eclipse.jetty.websocket.core.api.WebSocketPolicy; import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.core.extensions.WebSocketExtensionRegistry; import org.eclipse.jetty.websocket.core.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.io.IncomingFrames; import org.eclipse.jetty.websocket.core.io.IncomingFrames;
import org.eclipse.jetty.websocket.core.io.InternalConnection;
import org.eclipse.jetty.websocket.core.io.OutgoingFrames; import org.eclipse.jetty.websocket.core.io.OutgoingFrames;
import org.eclipse.jetty.websocket.core.io.WebSocketSession; import org.eclipse.jetty.websocket.core.io.WebSocketSession;
import org.eclipse.jetty.websocket.core.io.event.EventDriver; import org.eclipse.jetty.websocket.core.io.event.EventDriver;
@ -65,6 +66,18 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
{ {
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class); private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
public static UpgradeContext getActiveUpgradeContext()
{
return ACTIVE_CONTEXT.get();
}
protected static void setActiveUpgradeContext(UpgradeContext connection)
{
ACTIVE_CONTEXT.set(connection);
}
private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>(); private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
{ {
handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455()); handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
@ -125,6 +138,15 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
WebSocketCreator creator = getCreator(); WebSocketCreator creator = getCreator();
UpgradeContext context = getActiveUpgradeContext();
if (context == null)
{
context = new UpgradeContext();
setActiveUpgradeContext(context);
}
context.setRequest(sockreq);
context.setResponse(sockresp);
Object websocketPojo = creator.createWebSocket(sockreq,sockresp); Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
// Handle response forbidden (and similar paths) // Handle response forbidden (and similar paths)
@ -357,14 +379,20 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
} }
// Create connection // Create connection
HttpConnection http = HttpConnection.getCurrentConnection(); UpgradeContext context = getActiveUpgradeContext();
EndPoint endp = http.getEndPoint(); InternalConnection connection = context.getConnection();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
WebSocketServerConnection connection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
LOG.debug("HttpConnection: {}",http); if (connection == null)
LOG.debug("AsyncWebSocketConnection: {}",connection); {
HttpConnection http = HttpConnection.getCurrentConnection();
EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
connection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
LOG.debug("HttpConnection: {}",http);
LOG.debug("AsyncWebSocketConnection: {}",connection);
}
// Initialize / Negotiate Extensions // Initialize / Negotiate Extensions
WebSocketSession session = new WebSocketSession(driver,connection,getPolicy(),response.getAcceptedSubProtocol()); WebSocketSession session = new WebSocketSession(driver,connection,getPolicy(),response.getAcceptedSubProtocol());
@ -379,8 +407,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Connect extensions // Connect extensions
if (extensions != null) if (extensions != null)
{ {
connection.getParser().configureFromExtensions(extensions); connection.configureFromExtensions(extensions);
connection.getGenerator().configureFromExtensions(extensions);
Iterator<Extension> extIter; Iterator<Extension> extIter;
// Connect outgoings // Connect outgoings
@ -406,7 +433,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// configure session for outgoing flows // configure session for outgoing flows
session.setOutgoing(outgoing); session.setOutgoing(outgoing);
// configure connection for incoming flows // configure connection for incoming flows
connection.getParser().setIncomingFramesHandler(incoming); connection.setIncoming(incoming);
// Tell jetty about the new connection // Tell jetty about the new connection
request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection); request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.server; package org.eclipse.jetty.websocket.server;
import java.io.IOException; import java.io.IOException;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -37,14 +38,14 @@ import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
* appropriate conditions. * appropriate conditions.
* <p> * <p>
* The most basic implementation would be as follows. * The most basic implementation would be as follows.
* *
* <pre> * <pre>
* package my.example; * package my.example;
* *
* import javax.servlet.http.HttpServletRequest; * import javax.servlet.http.HttpServletRequest;
* import org.eclipse.jetty.websocket.WebSocket; * import org.eclipse.jetty.websocket.WebSocket;
* import org.eclipse.jetty.websocket.server.WebSocketServlet; * import org.eclipse.jetty.websocket.server.WebSocketServlet;
* *
* public class MyEchoServlet extends WebSocketServlet * public class MyEchoServlet extends WebSocketServlet
* { * {
* &#064;Override * &#064;Override
@ -54,29 +55,29 @@ import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
* } * }
* } * }
* </pre> * </pre>
* *
* Note: that only request that conforms to a "WebSocket: Upgrade" handshake request will trigger the {@link WebSocketServerFactory} handling of creating * Note: that only request that conforms to a "WebSocket: Upgrade" handshake request will trigger the {@link WebSocketServerFactory} handling of creating
* WebSockets.<br> * WebSockets.<br>
* All other requests are treated as normal servlet requests. * All other requests are treated as normal servlet requests.
* *
* <p> * <p>
* <b>Configuration / Init-Parameters:</b><br> * <b>Configuration / Init-Parameters:</b><br>
* Note: If you use the {@link WebSocket &#064;WebSocket} annotation, these configuration settings can be specified on a per WebSocket basis, vs a per Servlet * Note: If you use the {@link WebSocket &#064;WebSocket} annotation, these configuration settings can be specified on a per WebSocket basis, vs a per Servlet
* basis. * basis.
* *
* <dl> * <dl>
* <dt>bufferSize</dt> * <dt>bufferSize</dt>
* <dd>can be used to set the buffer size, which is also the max frame byte size<br> * <dd>can be used to set the buffer size, which is also the max frame byte size<br>
* <i>Default: 8192</i></dd> * <i>Default: 8192</i></dd>
* *
* <dt>maxIdleTime</dt> * <dt>maxIdleTime</dt>
* <dd>set the time in ms that a websocket may be idle before closing<br> * <dd>set the time in ms that a websocket may be idle before closing<br>
* <i>Default:</i></dd> * <i>Default:</i></dd>
* *
* <dt>maxTextMessagesSize</dt> * <dt>maxTextMessagesSize</dt>
* <dd>set the size in characters that a websocket may be accept before closing<br> * <dd>set the size in characters that a websocket may be accept before closing<br>
* <i>Default:</i></dd> * <i>Default:</i></dd>
* *
* <dt>maxBinaryMessagesSize</dt> * <dt>maxBinaryMessagesSize</dt>
* <dd>set the size in bytes that a websocket may be accept before closing<br> * <dd>set the size in bytes that a websocket may be accept before closing<br>
* <i>Default:</i></dd> * <i>Default:</i></dd>
@ -88,6 +89,8 @@ public abstract class WebSocketServlet extends HttpServlet
private final Logger LOG = Log.getLogger(getClass()); private final Logger LOG = Log.getLogger(getClass());
private WebSocketServerFactory webSocketFactory; private WebSocketServerFactory webSocketFactory;
public abstract void configure(WebSocketServerFactory factory);
@Override @Override
public void destroy() public void destroy()
{ {
@ -146,8 +149,6 @@ public abstract class WebSocketServlet extends HttpServlet
} }
} }
public abstract void configure(WebSocketServerFactory factory);
/** /**
* @see javax.servlet.http.HttpServlet#service(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse) * @see javax.servlet.http.HttpServlet#service(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
*/ */

View File

@ -0,0 +1,47 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server.mux;
import java.nio.ByteBuffer;
import org.eclipse.jetty.server.HttpInput;
/**
* HttpInput for Empty Http body sections.
*/
public class EmptyHttpInput extends HttpInput<ByteBuffer>
{
@Override
protected int get(ByteBuffer item, byte[] buffer, int offset, int length)
{
return 0;
}
@Override
protected void onContentConsumed(ByteBuffer item)
{
// do nothing
}
@Override
protected int remaining(ByteBuffer item)
{
return 0;
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.server.mux; package org.eclipse.jetty.websocket.server.mux;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
@ -26,12 +28,12 @@ import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.server.HttpTransport;
/** /**
* Process incoming AddChannelRequest headers within the existing Jetty framework. Benefitting from Server container knowledge and various webapp configuration * Process incoming AddChannelRequest headers within the existing Jetty framework. Benefiting from Server container knowledge and various webapp configuration
* knowledge. * knowledge.
*/ */
public class HttpChannelOverMux extends HttpChannel<String> public class HttpChannelOverMux extends HttpChannel<ByteBuffer>
{ {
public HttpChannelOverMux(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput<String> input) public HttpChannelOverMux(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
{ {
super(connector,configuration,endPoint,transport,input); super(connector,configuration,endPoint,transport,input);
} }

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server.mux;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
/**
* Take {@link ResponseInfo} objects and convert to bytes for response.
*/
public class HttpTransportOverMux implements HttpTransport
{
private static final Logger LOG = Log.getLogger(HttpTransportOverMux.class);
private final BlockingCallback streamBlocker = new BlockingCallback();
public HttpTransportOverMux(Muxer muxer, MuxChannel channel)
{
// TODO Auto-generated constructor stub
}
@Override
public void completed()
{
LOG.debug("completed");
}
/**
* Process ResponseInfo object into AddChannelResponse
*/
@Override
public void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent) throws IOException
{
send(info,responseBodyContent,lastContent,streamBlocker.getPhase(),streamBlocker);
try
{
streamBlocker.block();
}
catch (IOException e)
{
throw e;
}
catch (Exception e)
{
throw new EofException(e);
}
}
@Override
public <C> void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent, C context, Callback<C> callback)
{
if (lastContent == false)
{
// throw error
}
if (info.getContentLength() > 0)
{
// throw error
}
// prepare the AddChannelResponse
// TODO: look at HttpSender in jetty-client for generator loop logic
}
}

View File

@ -20,8 +20,17 @@ package org.eclipse.jetty.websocket.server.mux;
import java.io.IOException; import java.io.IOException;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.websocket.core.api.UpgradeRequest;
import org.eclipse.jetty.websocket.core.api.UpgradeResponse;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel; import org.eclipse.jetty.websocket.core.extensions.mux.MuxChannel;
import org.eclipse.jetty.websocket.core.extensions.mux.MuxException; import org.eclipse.jetty.websocket.core.extensions.mux.MuxException;
import org.eclipse.jetty.websocket.core.extensions.mux.Muxer;
import org.eclipse.jetty.websocket.core.extensions.mux.add.MuxAddServer; import org.eclipse.jetty.websocket.core.extensions.mux.add.MuxAddServer;
/** /**
@ -29,20 +38,73 @@ import org.eclipse.jetty.websocket.core.extensions.mux.add.MuxAddServer;
*/ */
public class MuxAddHandler implements MuxAddServer public class MuxAddHandler implements MuxAddServer
{ {
/** Represents physical connector */
private Connector connector;
/** Used for local address */
private EndPoint endPoint;
/** The original request handshake */
private UpgradeRequest baseHandshakeRequest;
/** The original request handshake */
private UpgradeResponse baseHandshakeResponse;
private int maximumHeaderSize = 32 * 1024;
@Override
public UpgradeRequest getPhysicalHandshakeRequest()
{
// TODO Auto-generated method stub
return null;
}
@Override
public UpgradeResponse getPhysicalHandshakeResponse()
{
// TODO Auto-generated method stub
return null;
}
/** /**
* An incoming MuxAddChannel request. * An incoming MuxAddChannel request.
* *
* @param the * @param the
* channel this request should be bound to * channel this request should be bound to
* @param requestHandshake * @param request
* the incoming request headers * the incoming request headers (complete and merged if delta encoded)
* @return the outgoing response headers * @return the outgoing response headers
*/ */
@Override @Override
public String handshake(MuxChannel channel, String requestHandshake) throws MuxException, IOException public void handshake(Muxer muxer, MuxChannel channel, UpgradeRequest request) throws MuxException, IOException
{ {
// Need to call into HttpChannel to get the websocket properly setup. // Need to call into HttpChannel to get the websocket properly setup.
HttpTransportOverMux transport = new HttpTransportOverMux(muxer,channel);
EmptyHttpInput input = new EmptyHttpInput();
HttpConfiguration configuration = new HttpConfiguration();
return null; HttpChannelOverMux httpChannel = new HttpChannelOverMux(//
connector,configuration,endPoint,transport,input);
HttpMethod method = HttpMethod.fromString(request.getMethod());
HttpVersion version = HttpVersion.fromString(request.getHttpVersion());
httpChannel.startRequest(method,request.getMethod(),request.getRemoteURI(),version);
for (String headerName : request.getHeaders().keySet())
{
HttpHeader header = HttpHeader.lookAheadGet(headerName.getBytes(),0,headerName.length());
for (String value : request.getHeaders().get(headerName))
{
httpChannel.parsedHeader(header,headerName,value);
}
}
httpChannel.headerComplete();
httpChannel.messageComplete();
httpChannel.run(); // calls into server for appropriate resource
// TODO: what's in request handshake is not enough to process the request.
// like a partial http request. (consider this a AddChannelRequest failure)
throw new MuxException("Not a valid request");
} }
} }