Starting to piece together the Extensions

This commit is contained in:
Joakim Erdfelt 2012-07-11 11:42:26 -07:00
parent 0d1fee5906
commit 6757160a94
28 changed files with 584 additions and 162 deletions

View File

@ -13,11 +13,10 @@
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.extensions;
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public interface Extension
{
public ExtensionConfig getConfig();

View File

@ -0,0 +1,19 @@
package org.eclipse.jetty.websocket.api;
import java.util.Iterator;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public interface ExtensionRegistry extends Iterable<Class<? extends Extension>>
{
public boolean isAvailable(String name);
@Override
public Iterator<Class<? extends Extension>> iterator();
public Extension newInstance(ExtensionConfig config);
public void register(String name, Class<? extends Extension> extension);
public void unregister(String name);
}

View File

@ -1,3 +1,18 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.driver;
import java.io.IOException;
@ -5,6 +20,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
@ -14,16 +30,17 @@ import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.MessageInputStream;
import org.eclipse.jetty.websocket.io.MessageReader;
import org.eclipse.jetty.websocket.io.RawConnection;
import org.eclipse.jetty.websocket.io.StreamAppender;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.Frame;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
@ -41,7 +58,7 @@ public class WebSocketEventDriver implements Parser.Listener
private final WebSocketPolicy policy;
private final EventMethods events;
private final ByteBufferPool bufferPool;
private WebSocketConnection connection;
private RawConnection connection;
private ByteBuffer activeMessage;
private StreamAppender activeStream;
@ -131,15 +148,20 @@ public class WebSocketEventDriver implements Parser.Listener
{
case CLOSE:
{
if (events.onClose == null)
{
// not interested in close events
return;
}
CloseInfo close = new CloseInfo(frame);
if (events.onClose != null)
{
events.onClose.call(websocket,connection,close.getStatusCode(),close.getReason());
}
throw new CloseException(close.getStatusCode(),close.getReason());
}
case PONG:
{
WebSocketFrame pong = new WebSocketFrame(OpCode.PONG);
pong.setPayload(frame.getPayload());
connection.write(null,new FutureCallback<Void>(),pong);
break;
}
case BINARY:
{
if (events.onBinary == null)
@ -325,7 +347,7 @@ public class WebSocketEventDriver implements Parser.Listener
* @param conn
* the connection
*/
public void setConnection(WebSocketConnection conn)
public void setConnection(RawConnection conn)
{
this.connection = conn;
}

View File

@ -15,6 +15,7 @@
*******************************************************************************/
package org.eclipse.jetty.websocket.extensions;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;

View File

@ -0,0 +1,91 @@
package org.eclipse.jetty.websocket.extensions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class WebSocketExtensionRegistry implements ExtensionRegistry
{
private Map<String, Class<? extends Extension>> registry;
public WebSocketExtensionRegistry()
{
registry = new HashMap<String, Class<? extends Extension>>();
}
@Override
public boolean isAvailable(String name)
{
synchronized (registry)
{
return registry.containsKey(name);
}
}
@Override
public Iterator<Class<? extends Extension>> iterator()
{
List<Class<? extends Extension>> coll = new ArrayList<>();
synchronized (registry)
{
coll.addAll(registry.values());
return coll.iterator();
}
}
@Override
public Extension newInstance(ExtensionConfig config)
{
if (config == null)
{
return null;
}
String name = config.getName();
if (StringUtil.isBlank(name))
{
return null;
}
Class<? extends Extension> extClass = registry.get(name);
if (extClass == null)
{
return null;
}
try
{
Extension ext = extClass.newInstance();
ext.setConfig(config);
return ext;
}
catch (InstantiationException | IllegalAccessException e)
{
throw new WebSocketException("Cannot instantiate extension: " + extClass,e);
}
}
@Override
public void register(String name, Class<? extends Extension> extension)
{
synchronized (registry)
{
registry.put(name,extension);
}
}
@Override
public void unregister(String name)
{
synchronized (registry)
{
registry.remove(name);
}
}
}

View File

@ -24,7 +24,8 @@ public class ControlFrameBytes<C> extends FrameBytes<C>
if(frame.getOpCode() == OpCode.CLOSE)
{
// TODO: close the connection (no packet)
// Disconnect the connection (no more packets/frames)
connection.disconnect(false);
}
}

View File

@ -3,9 +3,11 @@ package org.eclipse.jetty.websocket.io;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Interface for working with connections in a raw way.
@ -16,6 +18,8 @@ public interface RawConnection extends WebSocketConnection
{
<C> void complete(FrameBytes<C> frameBytes);
void disconnect(boolean onlyOutput);
void flush();
ByteBufferPool getBufferPool();
@ -27,4 +31,6 @@ public interface RawConnection extends WebSocketConnection
Parser getParser();
FrameQueue getQueue();
<C> void write(C context, Callback<C> callback, WebSocketFrame frame);
}

View File

@ -1,3 +1,18 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
@ -93,6 +108,21 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
}
}
@Override
public void disconnect(boolean onlyOutput)
{
AsyncEndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
LOG.debug("Shutting down output {}",endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
LOG.debug("Closing {}",endPoint);
endPoint.close();
}
}
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
{
try
@ -270,7 +300,8 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
private <C> void scheduleTimeout(FrameBytes<C> bytes)
{
if(policy.getMaxIdleTime()>0) {
if (policy.getMaxIdleTime() > 0)
{
bytes.task = scheduler.schedule(bytes,policy.getMaxIdleTime(),TimeUnit.MILLISECONDS);
}
}
@ -302,7 +333,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
CloseInfo close = new CloseInfo(statusCode,reason);
FutureCallback<Void> nop = new FutureCallback<>();
ControlFrameBytes<Void> frameBytes = new ControlFrameBytes<Void>(this,nop,null,close.asFrame());
queue.prepend(frameBytes);
queue.append(frameBytes);
flush();
}
@ -315,7 +346,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
private <C> void write(ByteBuffer buffer, WebSocketAsyncConnection webSocketAsyncConnection, FrameBytes<C> frameBytes)
{
LOG.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
getEndPoint().write(frameBytes.context,frameBytes.callback,buffer);
getEndPoint().write(frameBytes.context,frameBytes,buffer);
}
/**
@ -376,4 +407,21 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
queue.append(bytes);
flush();
}
@Override
public <C> void write(C context, Callback<C> callback, WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame())
{
ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.prepend(bytes);
}
else
{
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
}
}
}

View File

@ -1,3 +1,18 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
@ -16,21 +31,78 @@ import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
;
/**
* Parsing of a frames in WebSocket land.
*/
public class Parser
{
public interface Listener extends EventListener
public static interface Listener extends EventListener
{
public void onFrame(final WebSocketFrame frame);
public void onWebSocketException(WebSocketException e);
}
public static class ListenerList implements Listener
{
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
public void addListener(Listener listener)
{
listeners.add(listener);
}
@Override
public void onFrame(WebSocketFrame frame)
{
for (Listener listener : listeners)
{
try
{
listener.onFrame(frame);
}
catch (WebSocketException e)
{
throw e;
}
catch (Throwable t)
{
throw new WebSocketException(t);
}
}
}
@Override
public void onWebSocketException(WebSocketException e)
{
for (Listener listener : listeners)
{
listener.onWebSocketException(e);
}
}
public void removeListener(Listener listener)
{
listeners.remove(listener);
}
public void setListeners(List<Listener> lsnrs)
{
listeners.addAll(lsnrs);
}
}
private enum State
{
START, FINOP, PAYLOAD_LEN, PAYLOAD_LEN_BYTES, MASK, MASK_BYTES, PAYLOAD
START,
FINOP,
PAYLOAD_LEN,
PAYLOAD_LEN_BYTES,
MASK,
MASK_BYTES,
PAYLOAD
}
// State specific
@ -43,7 +115,7 @@ public class Parser
private int payloadLength;
private static final Logger LOG = Log.getLogger(Parser.class);
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private Listener listener;
private WebSocketPolicy policy;
public Parser(WebSocketPolicy wspolicy)
@ -55,11 +127,6 @@ public class Parser
this.policy = wspolicy;
}
public void addListener(Listener listener)
{
listeners.add(listener);
}
private void assertSanePayloadLength(long len)
{
LOG.debug("Payload Length: " + len);
@ -125,6 +192,11 @@ public class Parser
return amt;
}
public Listener getListener()
{
return listener;
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -133,8 +205,10 @@ public class Parser
protected void notifyFrame(final WebSocketFrame f)
{
LOG.debug("Notify Frame: {}",f);
for (Listener listener : listeners)
if (listener == null)
{
return;
}
try
{
listener.onFrame(f);
@ -149,19 +223,23 @@ public class Parser
notifyWebSocketException(new WebSocketException(t));
}
}
}
protected void notifyWebSocketException(WebSocketException e)
{
LOG.debug(e);
for (Listener listener : listeners)
if (listener == null)
{
listener.onWebSocketException(e);
return;
}
listener.onWebSocketException(e);
}
public void parse(ByteBuffer buffer)
{
if (buffer.remaining() <= 0)
{
return;
}
try
{
LOG.debug("Parsing {} bytes",buffer.remaining());
@ -202,6 +280,11 @@ public class Parser
*/
private boolean parseFrame(ByteBuffer buffer)
{
if (buffer.remaining() <= 0)
{
return false;
}
LOG.debug("Parsing {} bytes",buffer.remaining());
while (buffer.hasRemaining())
{
@ -427,9 +510,9 @@ public class Parser
return false;
}
public void removeListener(Listener listener)
public void setListener(Listener listener)
{
listeners.remove(listener);
this.listener = listener;
}
@Override

View File

@ -27,7 +27,7 @@ public class GeneratorParserRoundtripTest
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
@ -66,7 +66,7 @@ public class GeneratorParserRoundtripTest
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";

View File

@ -1,6 +1,6 @@
package org.eclipse.jetty.websocket.ab;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
@ -310,7 +310,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -344,7 +344,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -378,7 +378,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -412,7 +412,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -451,7 +451,7 @@ public class TestABCase1_1
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -488,7 +488,7 @@ public class TestABCase1_1
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -512,7 +512,7 @@ public class TestABCase1_1
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();

View File

@ -1,6 +1,6 @@
package org.eclipse.jetty.websocket.ab;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
@ -325,7 +325,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -359,7 +359,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -393,7 +393,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -427,7 +427,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -462,7 +462,7 @@ public class TestABCase1_2
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -499,7 +499,7 @@ public class TestABCase1_2
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -523,7 +523,7 @@ public class TestABCase1_2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();

View File

@ -1,6 +1,6 @@
package org.eclipse.jetty.websocket.ab;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -184,7 +184,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -214,7 +214,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -237,7 +237,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -268,7 +268,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -313,7 +313,7 @@ public class TestABCase2
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals("error should be returned for too large of ping payload",1,capture.getErrorCount(ProtocolException.class));

View File

@ -26,7 +26,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
@ -48,7 +48,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
@ -71,7 +71,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
@ -93,7 +93,7 @@ public class TestABCase4
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;

View File

@ -56,7 +56,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -90,7 +90,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));
@ -131,7 +131,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -190,7 +190,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -261,7 +261,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
capture.assertNoErrors();
@ -331,7 +331,7 @@ public class TestABCase7_3
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(expected);
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));

View File

@ -1,15 +1,35 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.rules.TestName;
public class LocalWebSocketConnection implements WebSocketConnection
public class LocalWebSocketConnection implements RawConnection, WebSocketConnection
{
private final String id;
@ -38,12 +58,57 @@ public class LocalWebSocketConnection implements WebSocketConnection
{
}
@Override
public <C> void complete(FrameBytes<C> frameBytes)
{
}
@Override
public void disconnect(boolean onlyOutput)
{
}
@Override
public void flush()
{
}
@Override
public ByteBufferPool getBufferPool()
{
return null;
}
@Override
public Executor getExecutor()
{
return null;
}
@Override
public Generator getGenerator()
{
return null;
}
@Override
public Parser getParser()
{
return null;
}
@Override
public WebSocketPolicy getPolicy()
{
return null;
}
@Override
public FrameQueue getQueue()
{
return null;
}
@Override
public InetSocketAddress getRemoteAddress()
{
@ -87,4 +152,9 @@ public class LocalWebSocketConnection implements WebSocketConnection
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
}
@Override
public <C> void write(C context, Callback<C> callback, WebSocketFrame frame)
{
}
}

View File

@ -36,7 +36,7 @@ public class ClosePayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -21,7 +21,7 @@ public class ParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -26,7 +26,7 @@ public class PingPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -22,7 +22,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
ByteBuffer buf = ByteBuffer.allocate(16);
@ -66,7 +66,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -89,7 +89,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -119,7 +119,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -159,7 +159,7 @@ public class RFC6455ExamplesParserTest
policy.setBufferSize(80000);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -190,7 +190,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -213,7 +213,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -39,7 +39,7 @@ public class TextPayloadParserTest
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertHasErrors(MessageTooLargeException.class,1);
@ -77,7 +77,7 @@ public class TextPayloadParserTest
policy.setMaxPayloadSize(100000);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -112,7 +112,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -149,7 +149,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -176,7 +176,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();
@ -202,7 +202,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.setListener(capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -1,6 +1,6 @@
package org.eclipse.jetty.websocket.server;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.api.Extension;
/**
* Abstract WebSocket creator interface.

View File

@ -3,7 +3,7 @@ package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.api.Extension;
public interface WebSocketHandshake
{

View File

@ -1,15 +1,18 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//========================================================================
package org.eclipse.jetty.websocket.server;
@ -37,15 +40,18 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension;
import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension;
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
@ -80,6 +86,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
private final WebSocketPolicy basePolicy;
private final EventMethodsCache methodsCache;
private final ByteBufferPool bufferPool;
private final ExtensionRegistry extensionRegistry;
private WebSocketCreator creator;
private Class<?> firstRegisteredClass;
@ -93,6 +100,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
this.basePolicy = policy;
this.methodsCache = new EventMethodsCache();
this.bufferPool = bufferPool;
this.extensionRegistry = new WebSocketExtensionRegistry();
this.creator = this;
// Create supportedVersions
@ -212,14 +220,13 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
for (ExtensionConfig cfg : requested)
{
Extension extension = newExtension(cfg.getName());
Extension extension = extensionRegistry.newInstance(cfg);
if (extension == null)
{
continue;
}
extension.setConfig(cfg);
LOG.debug("added {}",extension);
extensions.add(extension);
}
@ -251,24 +258,6 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
return true;
}
private Extension newExtension(String name)
{
try
{
Class<? extends Extension> extClass = extensionClasses.get(name);
if (extClass != null)
{
return extClass.newInstance();
}
}
catch (Exception e)
{
LOG.warn(e);
}
return null;
}
protected String[] parseProtocols(String protocol)
{
if (protocol == null)
@ -367,15 +356,17 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
List<Extension> extensions = initExtensions(request.getExtensions());
// TODO : bind extensions? layer extensions? how?
// TODO : wrap websocket with extension processing Parser.Listener list
connection.getParser().addListener(websocket);
Parser.ListenerList listenerList = new Parser.ListenerList();
listenerList.addListener(websocket);
connection.getParser().setListener(listenerList);
// TODO : connection.setWriteExtensions(extensions);
// TODO : implement endpoint.write() layer for outgoing extension frames.
// Process (version specific) handshake response
LOG.debug("Handshake Response: {}",handshaker);
handshaker.doHandshakeResponse(request,response,extensions);
LOG.debug("EndPoint: {}",endp);
LOG.debug("Handshake Complete: {}",connection);
// Add connection
addConnection(connection);

View File

@ -3,7 +3,7 @@ package org.eclipse.jetty.websocket.server.handshake;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.server.ServletWebSocketRequest;
import org.eclipse.jetty.websocket.server.ServletWebSocketResponse;
import org.eclipse.jetty.websocket.server.WebSocketHandshake;

View File

@ -1,3 +1,18 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server.handshake;
import java.io.IOException;
@ -5,7 +20,7 @@ import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.server.ServletWebSocketRequest;
import org.eclipse.jetty.websocket.server.ServletWebSocketResponse;

View File

@ -1,8 +1,24 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Queue;
@ -306,7 +322,15 @@ public class WebSocketServletRFCTest
WebSocketFrame bin = WebSocketFrame.binary(buf).setFin(true);
ByteBuffer bb = generator.generate(bin);
BufferUtil.flipToFlush(bb,0);
try
{
client.writeRaw(bb);
Assert.fail("Write should have failed due to terminated connection");
}
catch (SocketException e)
{
Assert.assertThat("Exception",e.getMessage(),containsString("Broken pipe"));
}
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove();
@ -340,7 +364,15 @@ public class WebSocketServletRFCTest
WebSocketFrame text = WebSocketFrame.text().setPayload(buf).setFin(true);
ByteBuffer bb = generator.generate(text);
BufferUtil.flipToFlush(bb,0);
try
{
client.writeRaw(bb);
Assert.fail("Write should have failed due to terminated connection");
}
catch (SocketException e)
{
Assert.assertThat("Exception",e.getMessage(),containsString("Broken pipe"));
}
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove();

View File

@ -1,3 +1,18 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server.blockhead;
import static org.hamcrest.Matchers.*;
@ -36,6 +51,8 @@ import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.UnitGenerator;
import org.junit.Assert;
@ -86,7 +103,7 @@ public class BlockheadClient implements Parser.Listener
bufferPool = new StandardByteBufferPool(policy.getBufferSize());
generator = new UnitGenerator();
parser = new Parser(policy);
parser.addListener(this);
parser.setListener(this);
incomingFrameQueue = new LinkedBlockingDeque<>();
}
@ -103,15 +120,21 @@ public class BlockheadClient implements Parser.Listener
public void close()
{
IO.close(in);
IO.close(out);
close(-1,null);
}
public void close(int statusCode, String message)
{
try
{
socket.close();
CloseInfo close = new CloseInfo(statusCode,message);
WebSocketFrame frame = close.asFrame();
LOG.debug("Issuing: {}",frame);
write(frame);
}
catch (IOException ignore)
catch (IOException e)
{
/* ignore */
LOG.debug(e);
}
}
@ -126,6 +149,21 @@ public class BlockheadClient implements Parser.Listener
in = socket.getInputStream();
}
private void disconnect()
{
LOG.debug("disconnect");
IO.close(in);
IO.close(out);
try
{
socket.close();
}
catch (IOException ignore)
{
/* ignore */
}
}
public String expectUpgradeResponse() throws IOException
{
String respHeader = readResponseHeader();
@ -373,6 +411,12 @@ public class BlockheadClient implements Parser.Listener
byte arr[] = BufferUtil.toArray(buf);
out.write(arr,0,arr.length);
out.flush();
if (frame.getOpCode() == OpCode.CLOSE)
{
// FIXME terminate the connection?
disconnect();
}
}
public void writeRaw(ByteBuffer buf) throws IOException