Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

Conflicts:
	jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java
This commit is contained in:
Greg Wilkins 2012-07-13 22:59:05 +10:00
commit acbf20af27
37 changed files with 679 additions and 488 deletions

View File

@ -99,7 +99,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
WebSocketAsyncConnection connection = new WebSocketAsyncConnection(endPoint,executor,scheduler,policy,bufferPool);
endPoint.setAsyncConnection(connection);
connection.getParser().setListener(websocket);
connection.getParser().setIncomingFramesHandler(websocket);
// TODO: track open websockets? bind open websocket to connection?

View File

@ -0,0 +1,28 @@
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
/**
* Exception to terminate the connection because it has received data within a frame payload that was not consistent with the requirements of that frame
* payload. (eg: not UTF-8 in a text frame, or a bad data seen in the {@link DeflateFrameExtension})
*
* @see StatusCode#BAD_PAYLOAD
*/
@SuppressWarnings("serial")
public class BadPayloadException extends CloseException
{
public BadPayloadException(String message)
{
super(StatusCode.BAD_PAYLOAD,message);
}
public BadPayloadException(String message, Throwable t)
{
super(StatusCode.BAD_PAYLOAD,message,t);
}
public BadPayloadException(Throwable t)
{
super(StatusCode.BAD_PAYLOAD,t);
}
}

View File

@ -15,12 +15,131 @@
//========================================================================
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public interface Extension
public abstract class Extension implements OutgoingFrames, IncomingFrames
{
public ExtensionConfig getConfig();
public String getName();
public String getParameterizedName();
public void setConfig(ExtensionConfig config);
private WebSocketPolicy policy;
private ByteBufferPool bufferPool;
private ExtensionConfig config;
private IncomingFrames nextIncomingFrames;
private OutgoingFrames nextOutgoingFrames;
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
public ExtensionConfig getConfig()
{
return config;
}
public String getName()
{
return config.getName();
}
public IncomingFrames getNextIncomingFrames()
{
return nextIncomingFrames;
}
public OutgoingFrames getNextOutgoingFrames()
{
return nextOutgoingFrames;
}
public String getParameterizedName()
{
return config.getParameterizedName();
}
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
public void incoming(WebSocketException e)
{
// pass thru, un-modified
nextIncomingFrames.incoming(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
// pass thru, un-modified
nextIncomingFrames.incoming(frame);
}
/**
* Convenience method for {@link #getNextIncomingFrames()#incoming(WebSocketException)}
*
* @param e
* the exception to pass to the next input/incoming
*/
public void nextIncoming(WebSocketException e)
{
nextIncomingFrames.incoming(e);
}
/**
* Convenience method for {@link #getNextIncomingFrames()#incoming(WebSocketFrame)}
*
* @param frame
* the frame to send to the next input/incoming
*/
public void nextIncoming(WebSocketFrame frame)
{
nextIncomingFrames.incoming(frame);
}
/**
* Convenience method for {@link #getNextOutgoingFrames()#output(WebSocketFrame)}
*
* @param frame
* the frame to send to the next output
*/
public void nextOutput(WebSocketFrame frame)
{
nextOutgoingFrames.output(frame);
}
@Override
public void output(WebSocketFrame frame)
{
// pass thru, un-modified
nextOutgoingFrames.output(frame);
}
public void setBufferPool(ByteBufferPool bufferPool)
{
this.bufferPool = bufferPool;
}
public void setConfig(ExtensionConfig config)
{
this.config = config;
}
public void setNextIncomingFrames(IncomingFrames nextIncomingFramesHandler)
{
this.nextIncomingFrames = nextIncomingFramesHandler;
}
public void setNextOutgoingFrames(OutgoingFrames nextOutgoingFramesHandler)
{
this.nextOutgoingFrames = nextOutgoingFramesHandler;
}
public void setPolicy(WebSocketPolicy policy)
{
this.policy = policy;
}
}

View File

@ -17,8 +17,6 @@ package org.eclipse.jetty.websocket.api;
import java.util.Iterator;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public interface ExtensionRegistry extends Iterable<Class<? extends Extension>>
{
public boolean isAvailable(String name);
@ -26,8 +24,6 @@ public interface ExtensionRegistry extends Iterable<Class<? extends Extension>>
@Override
public Iterator<Class<? extends Extension>> iterator();
public Extension newInstance(ExtensionConfig config);
public void register(String name, Class<? extends Extension> extension);
public void unregister(String name);

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.MessageInputStream;
import org.eclipse.jetty.websocket.io.MessageReader;
import org.eclipse.jetty.websocket.io.RawConnection;
@ -40,7 +41,6 @@ import org.eclipse.jetty.websocket.io.StreamAppender;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.Frame;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
@ -51,7 +51,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
* <p>
* There will be an instance of the WebSocketEventDriver per connection.
*/
public class WebSocketEventDriver implements Parser.Listener
public class WebSocketEventDriver implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(WebSocketEventDriver.class);
private final Object websocket;
@ -109,16 +109,24 @@ public class WebSocketEventDriver implements Parser.Listener
return websocket;
}
/**
* Internal entry point for connection established
*/
public void onConnect()
@Override
public void incoming(WebSocketException e)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onConnect()",websocket.getClass().getSimpleName());
LOG.debug("{}.incoming({})",websocket.getClass().getSimpleName(),e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
}
if (events.onException != null)
{
events.onException.call(websocket,connection,e);
}
events.onConnect.call(websocket,connection);
}
/**
@ -128,7 +136,7 @@ public class WebSocketEventDriver implements Parser.Listener
* the frame that appeared
*/
@Override
public void onFrame(WebSocketFrame frame)
public void incoming(WebSocketFrame frame)
{
if (LOG.isDebugEnabled())
{
@ -321,24 +329,16 @@ public class WebSocketEventDriver implements Parser.Listener
}
}
@Override
public void onWebSocketException(WebSocketException e)
/**
* Internal entry point for connection established
*/
public void onConnect()
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onWebSocketException({})",websocket.getClass().getSimpleName(),e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
}
if (events.onException != null)
{
events.onException.call(websocket,connection,e);
LOG.debug("{}.onConnect()",websocket.getClass().getSimpleName());
}
events.onConnect.call(websocket,connection);
}
/**

View File

@ -1,61 +0,0 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.extensions;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class AbstractExtension implements Extension
{
private final ExtensionConfig config;
public AbstractExtension(String name)
{
this.config = new ExtensionConfig(name);
}
@Override
public ExtensionConfig getConfig()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getName()
{
return config.getName();
}
@Override
public String getParameterizedName()
{
return config.getParameterizedName();
}
@Override
public void setConfig(ExtensionConfig config)
{
this.config.init(config);
}
@Override
public String toString()
{
return getParameterizedName();
}
}

View File

@ -21,19 +21,33 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension;
import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class WebSocketExtensionRegistry implements ExtensionRegistry
{
private Map<String, Class<? extends Extension>> registry;
private WebSocketPolicy policy;
private ByteBufferPool bufferPool;
public WebSocketExtensionRegistry()
public WebSocketExtensionRegistry(WebSocketPolicy policy, ByteBufferPool bufferPool)
{
registry = new HashMap<String, Class<? extends Extension>>();
this.policy = policy;
this.bufferPool = bufferPool;
this.registry = new HashMap<>();
this.registry.put("identity",IdentityExtension.class);
this.registry.put("fragment",FragmentExtension.class);
this.registry.put("x-deflate-frame",DeflateFrameExtension.class);
}
@Override
@ -56,7 +70,6 @@ public class WebSocketExtensionRegistry implements ExtensionRegistry
}
}
@Override
public Extension newInstance(ExtensionConfig config)
{
if (config == null)
@ -78,6 +91,8 @@ public class WebSocketExtensionRegistry implements ExtensionRegistry
{
Extension ext = extClass.newInstance();
ext.setConfig(config);
ext.setPolicy(policy);
ext.setBufferPool(bufferPool);
return ext;
}
catch (InstantiationException | IllegalAccessException e)

View File

@ -15,28 +15,201 @@
//========================================================================
package org.eclipse.jetty.websocket.extensions.deflate;
import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.extensions.AbstractExtension;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* @TODO Implement proposed deflate frame draft
*/
public class DeflateFrameExtension extends AbstractExtension
public class DeflateFrameExtension extends Extension
{
private static final Logger LOG = Log.getLogger(DeflateFrameExtension.class);
private int _minLength=8;
private Deflater _deflater;
private Inflater _inflater;
private int minLength = 8;
private Deflater deflater;
private Inflater inflater;
public DeflateFrameExtension()
// TODO: bring this method into some sort of ProtocolEnforcement class to share with Parser
private void assertSanePayloadLength(WebSocketFrame frame, int len)
{
super("x-deflate-frame");
LOG.debug("Payload Length: " + len);
// Since we use ByteBuffer so often, having lengths over Integer.MAX_VALUE is really impossible.
if (len > Integer.MAX_VALUE)
{
// OMG! Sanity Check! DO NOT WANT! Won't anyone think of the memory!
throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " + Integer.MAX_VALUE);
}
getPolicy().assertValidPayloadLength(len);
switch (frame.getOpCode())
{
case CLOSE:
if (len == 1)
{
throw new ProtocolException("Invalid close frame payload length, [" + len + "]");
}
// fall thru
case PING:
case PONG:
if (len > WebSocketFrame.MAX_CONTROL_PAYLOAD)
{
throw new ProtocolException("Invalid control frame payload length, [" + len + "] cannot exceed ["
+ WebSocketFrame.MAX_CONTROL_PAYLOAD + "]");
}
break;
}
}
@Override
public void incoming(WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame() || !frame.isRsv1())
{
// Cannot modify incoming control frames or ones with RSV1 set.
super.incoming(frame);
return;
}
ByteBuffer data = frame.getPayload();
// first 1 to 8 bytes contains post-inflated payload size.
int uncompressedLength = readUncompresseLength(frame,data);
// Set the data that is compressed to the inflater
inflater.setInput(BufferUtil.toArray(frame.getPayload()));
// Establish place for inflated data
byte buf[] = new byte[uncompressedLength];
try
{
int left = buf.length;
while (inflater.getRemaining() > 0)
{
// TODO: worry about the ByteBuffer.array here??
int inflated = inflater.inflate(buf,0,left);
if (inflated == 0)
{
throw new DataFormatException("insufficient data");
}
left -= inflated;
}
frame.setPayload(buf);
nextIncoming(frame);
}
catch (DataFormatException e)
{
LOG.warn(e);
throw new BadPayloadException(e);
}
finally
{
// release original buffer (no longer needed)
getBufferPool().release(data);
}
}
@Override
public void output(WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame())
{
// skip, cannot compress control frames.
super.output(frame);
return;
}
if (frame.getPayloadLength() < minLength)
{
// skip, frame too small to care compressing it.
super.output(frame);
return;
}
ByteBuffer data = frame.getPayload();
int length = frame.getPayloadLength();
// prepare the uncompressed input
deflater.reset();
deflater.setInput(BufferUtil.toArray(data));
deflater.finish();
// prepare the output buffer
byte out[] = new byte[length];
int out_offset = 0;
// write the uncompressed length
if (length > 0xFF_FF)
{
out[out_offset++] = 0x7F;
out[out_offset++] = (byte)0;
out[out_offset++] = (byte)0;
out[out_offset++] = (byte)0;
out[out_offset++] = (byte)0;
out[out_offset++] = (byte)((length >> 24) & 0xff);
out[out_offset++] = (byte)((length >> 16) & 0xff);
out[out_offset++] = (byte)((length >> 8) & 0xff);
out[out_offset++] = (byte)(length & 0xff);
}
else if (length >= 0x7E)
{
out[out_offset++] = 0x7E;
out[out_offset++] = (byte)(length >> 8);
out[out_offset++] = (byte)(length & 0xff);
}
else
{
out[out_offset++] = (byte)(length & 0x7f);
}
deflater.deflate(out,out_offset,length - out_offset);
frame.setPayload(out);
frame.setRsv1(deflater.finished());
nextOutput(frame);
// free original data buffer
getBufferPool().release(data);
}
private int readUncompresseLength(WebSocketFrame frame, ByteBuffer data)
{
int length = data.get();
int bytes = 0;
if (length == 0x7F)
{
// length 8 bytes (extended payload length)
length = 0;
bytes = 8;
}
else if (length == 0x7F)
{
// length 2 bytes (extended payload length)
length = 0;
bytes = 2;
}
while (bytes > 0)
{
byte b = data.get();
length |= (b & 0xFF) << (8 * bytes);
}
assertSanePayloadLength(frame,length);
return length;
}
@Override
@ -44,113 +217,9 @@ public class DeflateFrameExtension extends AbstractExtension
{
super.setConfig(config);
_minLength = config.getParameter("minLength",_minLength);
minLength = config.getParameter("minLength",minLength);
_deflater = new Deflater();
_inflater = new Inflater();
deflater = new Deflater();
inflater = new Inflater();
}
/* (non-Javadoc)
* @see org.eclipse.jetty.websocket.AbstractExtension#onFrame(byte, byte, org.eclipse.jetty.io.Buffer)
*/
/* TODO: Migrate to new Jetty9 IO
@Override
public void onFrame(byte flags, byte opcode, ByteBuffer buffer)
{
if (getConnection().isControl(opcode) || !isFlag(flags,1))
{
super.onFrame(flags,opcode,buffer);
return;
}
if (buffer.array()==null)
buffer=buffer.asMutableBuffer();
int length=0xff&buffer.get();
if (length>=0x7e)
{
int b=(length==0x7f)?8:2;
length=0;
while(b-->0)
length=0x100*length+(0xff&buffer.get());
}
// TODO check a max framesize
_inflater.setInput(buffer.array(),buffer.getIndex(),buffer.length());
ByteArrayBuffer buf = new ByteArrayBuffer(length);
try
{
while(_inflater.getRemaining()>0)
{
int inflated=_inflater.inflate(buf.array(),buf.putIndex(),buf.space());
if (inflated==0)
throw new DataFormatException("insufficient data");
buf.setPutIndex(buf.putIndex()+inflated);
}
super.onFrame(clearFlag(flags,1),opcode,buf);
}
catch(DataFormatException e)
{
LOG.warn(e);
getConnection().close(WebSocketConnectionRFC6455.CLOSE_BAD_PAYLOAD,e.toString());
}
}
*/
/* (non-Javadoc)
* @see org.eclipse.jetty.websocket.AbstractExtension#addFrame(byte, byte, byte[], int, int)
*/
/* TODO: Migrate to new Jetty9 IO
@Override
public void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
{
if (getConnection().isControl(opcode) || length<_minLength)
{
super.addFrame(clearFlag(flags,1),opcode,content,offset,length);
return;
}
// prepare the uncompressed input
_deflater.reset();
_deflater.setInput(content,offset,length);
_deflater.finish();
// prepare the output buffer
byte[] out= new byte[length];
int out_offset=0;
// write the uncompressed length
if (length>0xffff)
{
out[out_offset++]=0x7f;
out[out_offset++]=(byte)0;
out[out_offset++]=(byte)0;
out[out_offset++]=(byte)0;
out[out_offset++]=(byte)0;
out[out_offset++]=(byte)((length>>24)&0xff);
out[out_offset++]=(byte)((length>>16)&0xff);
out[out_offset++]=(byte)((length>>8)&0xff);
out[out_offset++]=(byte)(length&0xff);
}
else if (length >=0x7e)
{
out[out_offset++]=0x7e;
out[out_offset++]=(byte)(length>>8);
out[out_offset++]=(byte)(length&0xff);
}
else
{
out[out_offset++]=(byte)(length&0x7f);
}
int l = _deflater.deflate(out,out_offset,length-out_offset);
if (_deflater.finished())
super.addFrame(setFlag(flags,1),opcode,out,0,l+out_offset);
else
super.addFrame(clearFlag(flags,1),opcode,content,offset,length);
}
*/
}

View File

@ -15,17 +15,83 @@
//========================================================================
package org.eclipse.jetty.websocket.extensions.fragment;
import org.eclipse.jetty.websocket.extensions.AbstractExtension;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class FragmentExtension extends AbstractExtension
public class FragmentExtension extends Extension
{
private int _maxLength=-1;
private int _minFragments=1;
private int maxLength = -1;
private int minFragments = 1;
public FragmentExtension()
@Override
public void output(WebSocketFrame frame)
{
super("fragment");
if (frame.getOpCode().isControlFrame())
{
// Cannot fragment Control Frames
getNextOutgoingFrames().output(frame);
return;
}
int fragments = 1;
int length = frame.getPayloadLength();
OpCode opcode = frame.getOpCode();
ByteBuffer payload = frame.getPayload().slice();
int originalLimit = payload.limit();
// break apart payload based on maxLength rules
if (maxLength > 0)
{
while (length > maxLength)
{
fragments++;
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
frag.setFin(false);
payload.limit(Math.min(payload.limit() + maxLength,originalLimit));
frag.setPayload(payload);
nextOutput(frag);
length -= maxLength;
opcode = OpCode.CONTINUATION;
}
}
// break apart payload based on minimum # of fragments
if (fragments < minFragments)
{
int fragmentsLeft = (minFragments - fragments);
int fragLength = length / fragmentsLeft; // equal sized fragments
while (fragments < minFragments)
{
fragments++;
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
frag.setFin(false);
frag.setPayload(payload);
nextOutput(frag);
length -= fragLength;
opcode = OpCode.CONTINUATION;
}
}
// output whatever is left
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
payload.limit(originalLimit);
frag.setPayload(payload);
nextOutput(frag);
}
@Override
@ -33,42 +99,7 @@ public class FragmentExtension extends AbstractExtension
{
super.setConfig(config);
_maxLength = config.getParameter("maxLength",_maxLength);
_minFragments = config.getParameter("minFragments",_minFragments);
maxLength = config.getParameter("maxLength",maxLength);
minFragments = config.getParameter("minFragments",minFragments);
}
/* TODO: Migrate to new Jetty9 IO
public void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
{
if (getConnection().isControl(opcode))
{
super.addFrame(flags,opcode,content,offset,length);
return;
}
int fragments=1;
while (_maxLength>0 && length>_maxLength)
{
fragments++;
super.addFrame((byte)(flags&~getConnection().finMask()),opcode,content,offset,_maxLength);
length-=_maxLength;
offset+=_maxLength;
opcode=getConnection().continuationOpcode();
}
while (fragments<_minFragments)
{
int frag=length/2;
fragments++;
super.addFrame((byte)(flags&0x7),opcode,content,offset,frag);
length-=frag;
offset+=frag;
opcode=getConnection().continuationOpcode();
}
super.addFrame((byte)(flags|getConnection().finMask()),opcode,content,offset,length);
}
*/
}

View File

@ -15,12 +15,32 @@
//========================================================================
package org.eclipse.jetty.websocket.extensions.identity;
import org.eclipse.jetty.websocket.extensions.AbstractExtension;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class IdentityExtension extends AbstractExtension
public class IdentityExtension extends Extension
{
public IdentityExtension()
private String id;
@Override
public void setConfig(ExtensionConfig config)
{
super("identity");
super.setConfig(config);
StringBuilder s = new StringBuilder();
s.append(config.getName());
s.append("[");
for (String param : config.getParameterKeys())
{
s.append(';').append(param).append('=').append(QuotedStringTokenizer.quoteIfNeeded(config.getParameter(param,""),";="));
}
s.append("]");
id = s.toString();
}
@Override
public String toString()
{
return id;
}
}

View File

@ -0,0 +1,14 @@
package org.eclipse.jetty.websocket.io;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Interface for dealing with Incoming Frames.
*/
public interface IncomingFrames
{
public void incoming(WebSocketException e);
public void incoming(WebSocketFrame frame);
}

View File

@ -0,0 +1,11 @@
package org.eclipse.jetty.websocket.io;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Interface for dealing with outgoing frames.
*/
public interface OutgoingFrames
{
void output(WebSocketFrame frame);
}

View File

@ -15,11 +15,13 @@
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -31,6 +33,8 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
*/
public interface RawConnection extends WebSocketConnection
{
void close() throws IOException;
<C> void complete(FrameBytes<C> frameBytes);
void disconnect(boolean onlyOutput);
@ -45,6 +49,8 @@ public interface RawConnection extends WebSocketConnection
Parser getParser();
WebSocketPolicy getPolicy();
FrameQueue getQueue();
<C> void write(C context, Callback<C> callback, WebSocketFrame frame);

View File

@ -46,7 +46,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io
*/
public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, WebSocketConnection
public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, WebSocketConnection, OutgoingFrames
{
static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class);
private static final ThreadLocal<WebSocketAsyncConnection> CURRENT_CONNECTION = new ThreadLocal<WebSocketAsyncConnection>();
@ -61,17 +61,14 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
CURRENT_CONNECTION.set(connection);
}
final ByteBufferPool bufferPool;
private final ByteBufferPool bufferPool;
private final ScheduledExecutorService scheduler;
final Generator generator;
private final Generator generator;
private final Parser parser;
final WebSocketPolicy policy;
final FrameQueue queue;
// TODO: track extensions? (only those that need to operate at this level?)
// TODO: are extensions going to layer the endpoint?
// TODO: are extensions going to layer the connection?
private final WebSocketPolicy policy;
private final FrameQueue queue;
private List<ExtensionConfig> extensions;
private OutgoingFrames outgoingFramesHandler;
private boolean flushing;
public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
@ -268,6 +265,12 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
fillInterested();
}
@Override
public void output(WebSocketFrame frame)
{
// TODO Auto-generated method stub
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{

View File

@ -17,6 +17,7 @@ package org.eclipse.jetty.websocket.protocol;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.eclipse.jetty.util.QuotedStringTokenizer;
@ -85,6 +86,11 @@ public class ExtensionConfig
return str.toString();
}
public Set<String> getParameterKeys()
{
return parameters.keySet();
}
/**
* Initialize the parameters on this config from the other configuration.
* @param other the other configuration.

View File

@ -16,9 +16,6 @@
package org.eclipse.jetty.websocket.protocol;
import java.nio.ByteBuffer;
import java.util.EventListener;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -27,68 +24,13 @@ import org.eclipse.jetty.websocket.api.MessageTooLargeException;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames;
/**
* Parsing of a frames in WebSocket land.
*/
public class Parser
{
public static interface Listener extends EventListener
{
public void onFrame(final WebSocketFrame frame);
public void onWebSocketException(WebSocketException e);
}
public static class ListenerList implements Listener
{
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
public void addListener(Listener listener)
{
listeners.add(listener);
}
@Override
public void onFrame(WebSocketFrame frame)
{
for (Listener listener : listeners)
{
try
{
listener.onFrame(frame);
}
catch (WebSocketException e)
{
throw e;
}
catch (Throwable t)
{
throw new WebSocketException(t);
}
}
}
@Override
public void onWebSocketException(WebSocketException e)
{
for (Listener listener : listeners)
{
listener.onWebSocketException(e);
}
}
public void removeListener(Listener listener)
{
listeners.remove(listener);
}
public void setListeners(List<Listener> lsnrs)
{
listeners.addAll(lsnrs);
}
}
private enum State
{
START,
@ -110,7 +52,7 @@ public class Parser
private int payloadLength;
private static final Logger LOG = Log.getLogger(Parser.class);
private Listener listener;
private IncomingFrames incomingFramesHandler;
private WebSocketPolicy policy;
public Parser(WebSocketPolicy wspolicy)
@ -136,14 +78,14 @@ public class Parser
switch (frame.getOpCode())
{
case CLOSE:
if (payloadLength == 1)
if (len == 1)
{
throw new ProtocolException("Invalid close frame payload length, [" + payloadLength + "]");
}
// fall thru
case PING:
case PONG:
if (payloadLength > WebSocketFrame.MAX_CONTROL_PAYLOAD)
if (len > WebSocketFrame.MAX_CONTROL_PAYLOAD)
{
throw new ProtocolException("Invalid control frame payload length, [" + payloadLength + "] cannot exceed ["
+ WebSocketFrame.MAX_CONTROL_PAYLOAD + "]");
@ -187,9 +129,9 @@ public class Parser
return amt;
}
public Listener getListener()
public IncomingFrames getIncomingFramesHandler()
{
return listener;
return incomingFramesHandler;
}
public WebSocketPolicy getPolicy()
@ -200,13 +142,13 @@ public class Parser
protected void notifyFrame(final WebSocketFrame f)
{
LOG.debug("Notify Frame: {}",f);
if (listener == null)
if (incomingFramesHandler == null)
{
return;
}
try
{
listener.onFrame(f);
incomingFramesHandler.incoming(f);
}
catch (WebSocketException e)
{
@ -222,11 +164,11 @@ public class Parser
protected void notifyWebSocketException(WebSocketException e)
{
LOG.debug(e);
if (listener == null)
if (incomingFramesHandler == null)
{
return;
}
listener.onWebSocketException(e);
incomingFramesHandler.incoming(e);
}
public void parse(ByteBuffer buffer)
@ -345,7 +287,7 @@ public class Parser
frame.setMasked((b & 0x80) != 0);
payloadLength = (byte)(0x7F & b);
if (payloadLength == 127)
if (payloadLength == 127) // 0x7F
{
// length 8 bytes (extended payload length)
payloadLength = 0;
@ -353,7 +295,7 @@ public class Parser
cursor = 8;
break; // continue onto next state
}
else if (payloadLength == 126)
else if (payloadLength == 126) // 0x7E
{
// length 2 bytes (extended payload length)
payloadLength = 0;
@ -505,22 +447,31 @@ public class Parser
return false;
}
public void setListener(Listener listener)
public void setIncomingFramesHandler(IncomingFrames incoming)
{
this.listener = listener;
this.incomingFramesHandler = incoming;
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("Parser [state=");
builder.append("Parser[");
if (incomingFramesHandler == null)
{
builder.append("NO_HANDLER");
}
else
{
builder.append(incomingFramesHandler.getClass().getSimpleName());
}
builder.append(",s=");
builder.append(state);
builder.append(", cursor=");
builder.append(",c=");
builder.append(cursor);
builder.append(", payloadLength=");
builder.append(",len=");
builder.append(payloadLength);
builder.append(", frame=");
builder.append(",f=");
builder.append(frame);
builder.append("]");
return builder.toString();

View File

@ -42,7 +42,7 @@ public class GeneratorParserRoundtripTest
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
@ -81,7 +81,7 @@ public class GeneratorParserRoundtripTest
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";

View File

@ -325,7 +325,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -359,7 +359,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -393,7 +393,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -427,7 +427,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -466,7 +466,7 @@ public class TestABCase1_1
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -503,7 +503,7 @@ public class TestABCase1_1
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -527,7 +527,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();

View File

@ -340,7 +340,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -374,7 +374,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -408,7 +408,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -442,7 +442,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -477,7 +477,7 @@ public class TestABCase1_2
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -514,7 +514,7 @@ public class TestABCase1_2
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -538,7 +538,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();

View File

@ -199,7 +199,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -229,7 +229,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -252,7 +252,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -283,7 +283,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -328,7 +328,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals("error should be returned for too large of ping payload",1,capture.getErrorCount(ProtocolException.class));

View File

@ -41,7 +41,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
@ -63,7 +63,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
@ -86,7 +86,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
@ -108,7 +108,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;

View File

@ -71,7 +71,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -105,7 +105,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));
@ -146,7 +146,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -205,7 +205,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -276,7 +276,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -346,7 +346,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));

View File

@ -60,7 +60,7 @@ public class WebSocketEventDriverTest
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(2);
socket.capture.assertEventStartsWith(0,"onWebSocketConnect");
@ -76,8 +76,8 @@ public class WebSocketEventDriverTest
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(makeBinaryFrame("Hello World",true));
driver.onFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
driver.incoming(makeBinaryFrame("Hello World",true));
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
@ -94,10 +94,10 @@ public class WebSocketEventDriverTest
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(new WebSocketFrame(OpCode.PING).setPayload("PING"));
driver.onFrame(WebSocketFrame.text("Text Me"));
driver.onFrame(WebSocketFrame.binary().setPayload("Hello Bin"));
driver.onFrame(new CloseInfo(StatusCode.SHUTDOWN).asFrame());
driver.incoming(new WebSocketFrame(OpCode.PING).setPayload("PING"));
driver.incoming(WebSocketFrame.text("Text Me"));
driver.incoming(WebSocketFrame.binary().setPayload("Hello Bin"));
driver.incoming(new CloseInfo(StatusCode.SHUTDOWN).asFrame());
socket.capture.assertEventCount(6);
socket.capture.assertEventStartsWith(0,"onConnect(");
@ -117,8 +117,8 @@ public class WebSocketEventDriverTest
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(makeBinaryFrame("Hello World",true));
driver.onFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
driver.incoming(makeBinaryFrame("Hello World",true));
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
@ -135,8 +135,8 @@ public class WebSocketEventDriverTest
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(WebSocketFrame.text("Hello World"));
driver.onFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
driver.incoming(WebSocketFrame.text("Hello World"));
driver.incoming(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onWebSocketConnect");

View File

@ -52,7 +52,7 @@ public class ClosePayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -23,12 +23,10 @@ import java.util.List;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.junit.Assert;
public class FrameParseCapture implements Parser.Listener
public class FrameParseCapture implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(FrameParseCapture.class);
private List<WebSocketFrame> frames = new ArrayList<>();
@ -94,15 +92,15 @@ public class FrameParseCapture implements Parser.Listener
}
@Override
public void onFrame(WebSocketFrame frame)
{
frames.add(frame);
}
@Override
public void onWebSocketException(WebSocketException e)
public void incoming(WebSocketException e)
{
LOG.warn(e);
errors.add(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
frames.add(frame);
}
}

View File

@ -37,7 +37,7 @@ public class ParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -42,7 +42,7 @@ public class PingPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -38,7 +38,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
ByteBuffer buf = ByteBuffer.allocate(16);
@ -82,7 +82,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -105,7 +105,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -135,7 +135,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -175,7 +175,7 @@ public class RFC6455ExamplesParserTest
policy.setBufferSize(80000);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -206,7 +206,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -229,7 +229,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -55,7 +55,7 @@ public class TextPayloadParserTest
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertHasErrors(MessageTooLargeException.class,1);
@ -93,7 +93,7 @@ public class TextPayloadParserTest
policy.setMaxPayloadSize(100000);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -128,7 +128,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -165,7 +165,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -192,7 +192,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -218,7 +218,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.setListener(capture);
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -16,9 +16,6 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.websocket.api.Extension;
public interface WebSocketHandshake
{
@ -27,8 +24,7 @@ public interface WebSocketHandshake
*
* @param request
* @param response
* @param extensions
* @param acceptedSubProtocol
*/
public void doHandshakeResponse(ServletWebSocketRequest request, ServletWebSocketResponse response, List<Extension> extensions) throws IOException;
public void doHandshakeResponse(ServletWebSocketRequest request, ServletWebSocketResponse response) throws IOException;
}

View File

@ -47,12 +47,9 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension;
import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
@ -64,14 +61,6 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private final Queue<WebSocketAsyncConnection> connections = new ConcurrentLinkedQueue<WebSocketAsyncConnection>();
// TODO: replace with ExtensionRegistry in websocket-core
private final Map<String, Class<? extends Extension>> extensionClasses = new HashMap<>();
{
extensionClasses.put("identity",IdentityExtension.class);
extensionClasses.put("fragment",FragmentExtension.class);
extensionClasses.put("x-deflate-frame",DeflateFrameExtension.class);
}
private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
{
handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
@ -86,7 +75,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
private final WebSocketPolicy basePolicy;
private final EventMethodsCache methodsCache;
private final ByteBufferPool bufferPool;
private final ExtensionRegistry extensionRegistry;
private final WebSocketExtensionRegistry extensionRegistry;
private WebSocketCreator creator;
private Class<?> firstRegisteredClass;
@ -100,7 +89,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
this.basePolicy = policy;
this.methodsCache = new EventMethodsCache();
this.bufferPool = bufferPool;
this.extensionRegistry = new WebSocketExtensionRegistry();
this.extensionRegistry = new WebSocketExtensionRegistry(basePolicy,bufferPool);
this.creator = this;
// Create supportedVersions
@ -194,12 +183,9 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
return this.creator;
}
/**
* @return A modifiable map of extension name to extension class
*/
public Map<String, Class<? extends Extension>> getExtensionClassesMap()
public ExtensionRegistry getExtensionRegistry()
{
return extensionClasses;
return extensionRegistry;
}
/**
@ -296,6 +282,12 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
this.creator = creator;
}
private IncomingFrames setupExtensionChain(WebSocketEventDriver websocket, List<Extension> extensions)
{
// TODO Auto-generated method stub
return websocket;
}
/**
* Upgrade the request/response to a WebSocket Connection.
* <p>
@ -354,19 +346,12 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
// Initialize / Negotiate Extensions
List<Extension> extensions = initExtensions(request.getExtensions());
// TODO : bind extensions? layer extensions? how?
// TODO : wrap websocket with extension processing Parser.Listener list
Parser.ListenerList listenerList = new Parser.ListenerList();
listenerList.addListener(websocket);
connection.getParser().setListener(listenerList);
// TODO : connection.setWriteExtensions(extensions);
// TODO : implement endpoint.write() layer for outgoing extension frames.
IncomingFrames incoming = setupExtensionChain(websocket,extensions);
connection.getParser().setIncomingFramesHandler(incoming);
// Process (version specific) handshake response
LOG.debug("Handshake Response: {}",handshaker);
handshaker.doHandshakeResponse(request,response,extensions);
handshaker.doHandshakeResponse(request,response);
// Add connection
addConnection(connection);

View File

@ -16,9 +16,7 @@
package org.eclipse.jetty.websocket.server.handshake;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.server.ServletWebSocketRequest;
import org.eclipse.jetty.websocket.server.ServletWebSocketResponse;
import org.eclipse.jetty.websocket.server.WebSocketHandshake;
@ -34,7 +32,7 @@ public class HandshakeHixie76 implements WebSocketHandshake
public static final int VERSION = 0;
@Override
public void doHandshakeResponse(ServletWebSocketRequest request, ServletWebSocketResponse response, List<Extension> extensions) throws IOException
public void doHandshakeResponse(ServletWebSocketRequest request, ServletWebSocketResponse response) throws IOException
{
// TODO: implement the Hixie76 handshake?
throw new IOException("Not implemented yet");

View File

@ -16,12 +16,11 @@
package org.eclipse.jetty.websocket.server.handshake;
import java.io.IOException;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.server.ServletWebSocketRequest;
import org.eclipse.jetty.websocket.server.ServletWebSocketResponse;
import org.eclipse.jetty.websocket.server.WebSocketHandshake;
@ -35,7 +34,7 @@ public class HandshakeRFC6455 implements WebSocketHandshake
public static final int VERSION = 13;
@Override
public void doHandshakeResponse(ServletWebSocketRequest request, ServletWebSocketResponse response, List<Extension> extensions) throws IOException
public void doHandshakeResponse(ServletWebSocketRequest request, ServletWebSocketResponse response) throws IOException
{
String key = request.getHeader("Sec-WebSocket-Key");
@ -54,11 +53,11 @@ public class HandshakeRFC6455 implements WebSocketHandshake
response.addHeader("Sec-WebSocket-Protocol",response.getAcceptedSubProtocol());
}
if (extensions != null)
if (request.getExtensions() != null)
{
for (Extension ext : extensions)
for (ExtensionConfig ext : request.getExtensions())
{
response.addHeader("Sec-WebSocket-Extensions",ext.getConfig().getParameterizedName());
response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
}
}

View File

@ -53,6 +53,7 @@ public class IdentityExtensionTest
client.addExtensions("identity;param=0");
client.addExtensions("identity;param=1, identity ; param = '2' ; other = ' some = value '");
client.setProtocols("onConnect");
client.setDebug(true);
try
{

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -61,7 +62,7 @@ public class WebSocketLoadRFC6455Test
private/* final */AsyncEndPoint _endp;
private final Generator _generator;
private final Parser _parser;
private final Parser.Listener _handler = new Parser.Listener()
private final IncomingFrames _handler = new IncomingFrames()
{
/*
* public void close(int code,String message) { }
@ -70,16 +71,15 @@ public class WebSocketLoadRFC6455Test
*/
@Override
public void onFrame(WebSocketFrame frame)
public void incoming(WebSocketException e)
{
// TODO Auto-generated method stub
}
@Override
public void onWebSocketException(WebSocketException e)
public void incoming(WebSocketFrame frame)
{
// TODO Auto-generated method stub
}
};
private volatile ByteBuffer _response;

View File

@ -49,6 +49,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.OpCode;
@ -65,11 +66,11 @@ import org.junit.Assert;
* This client is <u>NOT</u> intended to be performant or follow the websocket spec religiously. In fact, being able to deviate from the websocket spec at will
* is desired for this client to operate properly for the unit testing within this module.
*/
public class BlockheadClient implements Parser.Listener
public class BlockheadClient implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(BlockheadClient.class);
/** Set to true to disable timeouts (for debugging reasons) */
private static final boolean DEBUG = false;
private boolean debug = false;
private final URI destHttpURI;
private final URI destWebsocketURI;
private final ByteBufferPool bufferPool;
@ -103,7 +104,7 @@ public class BlockheadClient implements Parser.Listener
bufferPool = new StandardByteBufferPool(policy.getBufferSize());
generator = new UnitGenerator();
parser = new Parser(policy);
parser.setListener(this);
parser.setIncomingFramesHandler(this);
incomingFrameQueue = new LinkedBlockingDeque<>();
}
@ -198,6 +199,22 @@ public class BlockheadClient implements Parser.Listener
return destWebsocketURI;
}
@Override
public void incoming(WebSocketException e)
{
LOG.warn(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
LOG.debug("incoming({})",frame);
if (!incomingFrameQueue.offerLast(frame))
{
throw new RuntimeException("Unable to queue incoming frame: " + frame);
}
}
public void lookFor(String string) throws IOException
{
String orig = string;
@ -227,22 +244,6 @@ public class BlockheadClient implements Parser.Listener
}
}
@Override
public void onFrame(WebSocketFrame frame)
{
LOG.debug("onFrame({})",frame);
if (!incomingFrameQueue.offerLast(frame))
{
throw new RuntimeException("Unable to queue incoming frame: " + frame);
}
}
@Override
public void onWebSocketException(WebSocketException e)
{
LOG.warn(e);
}
public int read(ByteBuffer buf) throws IOException
{
int len = 0;
@ -285,7 +286,7 @@ public class BlockheadClient implements Parser.Listener
{
/* ignore */
}
if (!DEBUG && (System.currentTimeMillis() > expireOn))
if (!debug && (System.currentTimeMillis() > expireOn))
{
throw new TimeoutException("Timeout reading all of the desired frames");
}
@ -348,6 +349,11 @@ public class BlockheadClient implements Parser.Listener
writeRaw(req.toString());
}
public void setDebug(boolean flag)
{
this.debug = flag;
}
public void setProtocols(String protocols)
{
this.protocols = protocols;

View File

@ -23,11 +23,11 @@ import java.util.List;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
public class FrameParseCapture implements Parser.Listener
public class FrameParseCapture implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(FrameParseCapture.class);
private List<WebSocketFrame> frames = new ArrayList<>();
@ -95,15 +95,15 @@ public class FrameParseCapture implements Parser.Listener
}
@Override
public void onFrame(WebSocketFrame frame)
{
frames.add(frame);
}
@Override
public void onWebSocketException(WebSocketException e)
public void incoming(WebSocketException e)
{
LOG.warn(e);
errors.add(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
frames.add(frame);
}
}