New ExtensionStack to manage Extensions easier and more consistently

This commit is contained in:
Joakim Erdfelt 2012-11-12 16:02:05 -07:00
parent 8548331735
commit 884f1a3eff
17 changed files with 765 additions and 142 deletions

View File

@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -47,14 +46,13 @@ import org.eclipse.jetty.util.UrlEncoded;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -79,7 +77,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session<Obje
private String negotiatedSubprotocol;
private long timeout;
private Map<String, String[]> parameterMap = new HashMap<>();
private List<ExtensionConfig> extensionConfigs = new ArrayList<>();
private WebSocketRemoteEndpoint remote;
private IncomingFrames incomingHandler;
private OutgoingFrames outgoingHandler;
@ -139,6 +136,31 @@ public class WebSocketSession extends ContainerLifeCycle implements Session<Obje
connection.close(statusCode,reason);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out,indent);
out.append(indent).append(" +- incomingHandler : ");
if (incomingHandler instanceof Dumpable)
{
((Dumpable)incomingHandler).dump(out,indent + " ");
}
else
{
out.append(incomingHandler.toString()).append('\n');
}
out.append(indent).append(" +- outgoingHandler : ");
if (outgoingHandler instanceof Dumpable)
{
((Dumpable)outgoingHandler).dump(out,indent + " ");
}
else
{
out.append(outgoingHandler.toString()).append('\n');
}
}
public LogicalConnection getConnection()
{
return connection;
@ -330,44 +352,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session<Obje
throw new WebSocketException("Cannot Open WebSocketSession, Already open");
}
// Initialize extensions
LOG.debug("Extension Configs={}",extensionConfigs);
List<Extension> extensions = new ArrayList<>();
for (ExtensionConfig config : extensionConfigs)
{
Extension ext = extensionFactory.newInstance(config);
extensions.add(ext);
LOG.debug("Adding Extension: {}",ext);
}
// Wire up Extensions
if (extensions.size() > 0)
{
Iterator<Extension> extIter;
// Connect outgoings
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextOutgoingFrames(outgoingHandler);
outgoingHandler = ext;
}
// Connect incomings
Collections.reverse(extensions);
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextIncomingFrames(incomingHandler);
incomingHandler = ext;
}
}
addBean(incomingHandler);
addBean(outgoingHandler);
// Connect remote
remote = new WebSocketRemoteEndpoint(connection,outgoingHandler);
@ -422,27 +406,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session<Obje
this.maximumMessageSize = length;
}
public void setNegotiatedExtensionConfigs(List<ExtensionConfig> extensions)
{
this.negotiatedExtensions.clear();
this.extensionConfigs.clear();
for (ExtensionConfig config : extensions)
{
this.extensionConfigs.add(config);
this.negotiatedExtensions.add(config.getParameterizedName());
}
}
public void setNegotiatedExtensions(List<String> negotiatedExtensions)
{
this.negotiatedExtensions = negotiatedExtensions;
this.extensionConfigs.clear();
for (String negotiatedExtension : negotiatedExtensions)
{
this.extensionConfigs.add(ExtensionConfig.parse(negotiatedExtension));
}
this.negotiatedExtensions.clear();
this.negotiatedExtensions.addAll(negotiatedExtensions);
}
public void setNegotiatedSubprotocol(String negotiatedSubprotocol)

View File

@ -196,4 +196,10 @@ public class AnnotatedEventDriver extends EventDriver
events.onText.call(websocket,session,message);
}
}
@Override
public String toString()
{
return String.format("%s[%s]", this.getClass().getSimpleName(), websocket);
}
}

View File

@ -24,6 +24,9 @@ import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -35,7 +38,8 @@ import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
public abstract class AbstractExtension implements Extension
@ManagedObject("Abstract Extension")
public abstract class AbstractExtension extends ContainerLifeCycle implements Extension
{
private final Logger log;
private WebSocketPolicy policy;
@ -50,6 +54,22 @@ public abstract class AbstractExtension implements Extension
log = Log.getLogger(this.getClass());
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out, indent);
// incoming
dumpWithHeading(out, indent, "incoming", this.nextIncoming);
dumpWithHeading(out, indent, "outgoing", this.nextOutgoing);
}
protected void dumpWithHeading(Appendable out, String indent, String heading, Object bean) throws IOException
{
out.append(indent).append(" +- ");
out.append(heading).append(" : ");
out.append(bean.toString());
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
@ -72,6 +92,18 @@ public abstract class AbstractExtension implements Extension
return config.getName();
}
@ManagedAttribute(name = "Next Incoming Frame Handler", readonly = true)
public IncomingFrames getNextIncoming()
{
return nextIncoming;
}
@ManagedAttribute(name = "Next Outgoing Frame Handler", readonly = true)
public OutgoingFrames getNextOutgoing()
{
return nextOutgoing;
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -191,6 +223,6 @@ public abstract class AbstractExtension implements Extension
@Override
public String toString()
{
return String.format("%s[%s]",config.getName(),config.getParameterizedName());
return String.format("%s[%s]",this.getClass().getSimpleName(),config.getParameterizedName());
}
}

View File

@ -0,0 +1,246 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.Parser;
/**
* Represents the stack of Extensions.
*/
@ManagedObject("Extension Stack")
public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(ExtensionStack.class);
private final ExtensionFactory factory;
private List<Extension> extensions;
private IncomingFrames nextIncoming;
private OutgoingFrames nextOutgoing;
public ExtensionStack(ExtensionFactory factory)
{
this.factory = factory;
}
public void configure(Generator generator)
{
generator.configureFromExtensions(extensions);
}
public void configure(Parser parser)
{
parser.configureFromExtensions(extensions);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
LOG.debug("doStart");
// Wire up Extensions
if ((extensions != null) && (extensions.size() > 0))
{
ListIterator<Extension> eiter = extensions.listIterator();
// Connect outgoings
while (eiter.hasNext())
{
Extension ext = eiter.next();
ext.setNextOutgoingFrames(nextOutgoing);
nextOutgoing = ext;
}
// Connect incomings
while (eiter.hasPrevious())
{
Extension ext = eiter.previous();
ext.setNextIncomingFrames(nextIncoming);
nextIncoming = ext;
}
}
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out,indent);
IncomingFrames websocket = getLastIncoming();
OutgoingFrames network = getLastOutgoing();
out.append(indent).append(" +- Stack\n");
out.append(indent).append(" +- Network : ").append(network.toString()).append('\n');
for (Extension ext : extensions)
{
out.append(indent).append(" +- Extension: ").append(ext.toString()).append('\n');
}
out.append(indent).append(" +- Websocket: ").append(websocket.toString()).append('\n');
}
@ManagedAttribute(name = "Extension List", readonly = true)
public List<Extension> getExtensions()
{
return extensions;
}
private IncomingFrames getLastIncoming()
{
IncomingFrames last = nextIncoming;
boolean done = false;
while (!done)
{
if (last instanceof AbstractExtension)
{
last = ((AbstractExtension)last).getNextIncoming();
}
else
{
done = true;
}
}
return last;
}
private OutgoingFrames getLastOutgoing()
{
OutgoingFrames last = nextOutgoing;
boolean done = false;
while (!done)
{
if (last instanceof AbstractExtension)
{
last = ((AbstractExtension)last).getNextOutgoing();
}
else
{
done = true;
}
}
return last;
}
/**
* Get the list of negotiated extensions, each entry being a full "name; params" extension configuration
*
* @return list of negotiated extensions
*/
public List<String> getNegotiatedExtensions()
{
List<String> ret = new ArrayList<>();
if (extensions == null)
{
return ret;
}
for (Extension ext : extensions)
{
ret.add(ext.getConfig().getParameterizedName());
}
return ret;
}
@ManagedAttribute(name = "Next Incoming Frames Handler", readonly = true)
public IncomingFrames getNextIncoming()
{
return nextIncoming;
}
@ManagedAttribute(name = "Next Outgoing Frames Handler", readonly = true)
public OutgoingFrames getNextOutgoing()
{
return nextOutgoing;
}
@Override
public void incomingError(WebSocketException e)
{
nextIncoming.incomingError(e);
}
@Override
public void incomingFrame(Frame frame)
{
nextIncoming.incomingFrame(frame);
}
/**
* Perform the extension negotiation.
* <p>
* For the list of negotiated extensions, use {@link #getNegotiatedExtensions()}
*
* @param configs
* the configurations being requested
*/
public void negotiate(List<ExtensionConfig> configs)
{
LOG.debug("Extension Configs={}",configs);
this.extensions = new ArrayList<>();
for (ExtensionConfig config : configs)
{
Extension ext = factory.newInstance(config);
extensions.add(ext);
LOG.debug("Adding Extension: {}",ext);
}
addBean(extensions);
}
@Override
public Future<SendResult> outgoingFrame(Frame frame) throws IOException
{
return nextOutgoing.outgoingFrame(frame);
}
public void setNextIncoming(IncomingFrames nextIncoming)
{
this.nextIncoming = nextIncoming;
}
public void setNextOutgoing(OutgoingFrames nextOutgoing)
{
this.nextOutgoing = nextOutgoing;
}
@Override
public String toString()
{
return String.format("ExtensionStack[extensions=%s]",extensions);
}
}

View File

@ -24,15 +24,22 @@ import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
@ManagedObject("Identity Extension")
public class IdentityExtension extends AbstractExtension
{
private String id;
public String getParam(String key)
{
return getConfig().getParameter(key,"?");
}
@Override
public void incomingError(WebSocketException e)
{
@ -60,10 +67,17 @@ public class IdentityExtension extends AbstractExtension
super.setConfig(config);
StringBuilder s = new StringBuilder();
s.append(config.getName());
s.append("@").append(Integer.toHexString(hashCode()));
s.append("[");
boolean delim = false;
for (String param : config.getParameterKeys())
{
s.append(';').append(param).append('=').append(QuotedStringTokenizer.quoteIfNeeded(config.getParameter(param,""),";="));
if (delim)
{
s.append(';');
}
s.append(param).append('=').append(QuotedStringTokenizer.quoteIfNeeded(config.getParameter(param,""),";="));
delim = true;
}
s.append("]");
id = s.toString();

View File

@ -415,7 +415,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
bytes = new DataFrameBytes(this,frame);
}
future = bytes;
future = new JavaxWebsocketFuture(bytes);
scheduleTimeout(bytes);

View File

@ -20,8 +20,6 @@ package org.eclipse.jetty.websocket.common.io;
import java.nio.ByteBuffer;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -40,7 +38,7 @@ public class ControlFrameBytes extends FrameBytes
}
@Override
public void completed(SendResult context)
public void completed(Void context)
{
LOG.debug("completed() - frame: {}",frame);
connection.getBufferPool().release(buffer);

View File

@ -20,8 +20,6 @@ package org.eclipse.jetty.websocket.common.io;
import java.nio.ByteBuffer;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -37,7 +35,7 @@ public class DataFrameBytes extends FrameBytes
}
@Override
public void completed(SendResult result)
public void completed(Void result)
{
if (LOG.isDebugEnabled())
{

View File

@ -21,8 +21,6 @@ package org.eclipse.jetty.websocket.common.io;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
@ -30,7 +28,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public abstract class FrameBytes extends FutureCallback<SendResult> implements Runnable
public abstract class FrameBytes extends FutureCallback<Void> implements Runnable
{
private final static Logger LOG = Log.getLogger(FrameBytes.class);
protected final AbstractWebSocketConnection connection;
@ -54,7 +52,7 @@ public abstract class FrameBytes extends FutureCallback<SendResult> implements R
}
@Override
public void completed(SendResult v)
public void completed(Void v)
{
super.completed(v);
if (LOG.isDebugEnabled())
@ -67,7 +65,7 @@ public abstract class FrameBytes extends FutureCallback<SendResult> implements R
}
@Override
public void failed(SendResult v, Throwable x)
public void failed(Void v, Throwable x)
{
super.failed(v,x);
if (x instanceof EofException)

View File

@ -0,0 +1,82 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.websocket.SendResult;
public class JavaxWebsocketFuture implements Future<SendResult>
{
private final FrameBytes bytes;
public JavaxWebsocketFuture(FrameBytes bytes)
{
this.bytes = bytes;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return this.bytes.cancel(mayInterruptIfRunning);
}
@Override
public SendResult get() throws InterruptedException, ExecutionException
{
try
{
bytes.get();
return new SendResult();
}
catch (ExecutionException e)
{
return new SendResult(e.getCause());
}
}
@Override
public SendResult get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
try
{
bytes.get(timeout,unit);
return new SendResult();
}
catch (ExecutionException e)
{
return new SendResult(e.getCause());
}
}
@Override
public boolean isCancelled()
{
return this.bytes.isCancelled();
}
@Override
public boolean isDone()
{
return this.bytes.isDone();
}
}

View File

@ -26,8 +26,8 @@ import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(
{ DeflateCompressionMethodTest.class, PerMessageCompressionExtensionTest.class, FragmentExtensionTest.class, IdentityExtensionTest.class,
WebkitDeflateFrameExtensionTest.class })
{ ExtensionStackTest.class, DeflateCompressionMethodTest.class, PerMessageCompressionExtensionTest.class, FragmentExtensionTest.class,
IdentityExtensionTest.class, WebkitDeflateFrameExtensionTest.class })
public class AllTests
{
/* nothing to do here, its all done in the annotations */

View File

@ -0,0 +1,57 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
/**
* Dummy implementation of {@link IncomingFrames} used for testing
*/
public class DummyIncomingFrames implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(DummyIncomingFrames.class);
private final String id;
public DummyIncomingFrames(String id)
{
this.id = id;
}
@Override
public void incomingError(WebSocketException e)
{
LOG.debug("incomingError()",e);
}
@Override
public void incomingFrame(Frame frame)
{
LOG.debug("incomingFrame({})",frame);
}
@Override
public String toString()
{
return String.format("%s@%x[%s]",this.getClass().getSimpleName(),hashCode(),id);
}
}

View File

@ -0,0 +1,57 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions;
import java.io.IOException;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.FinishedFuture;
/**
* Dummy implementation of {@link OutgoingFrames} used for testing
*/
public class DummyOutgoingFrames implements OutgoingFrames
{
private static final Logger LOG = Log.getLogger(DummyOutgoingFrames.class);
private final String id;
public DummyOutgoingFrames(String id)
{
this.id = id;
}
@Override
public Future<SendResult> outgoingFrame(Frame frame) throws IOException
{
LOG.debug("outgoingFrame({})",frame);
return FinishedFuture.INSTANCE;
}
@Override
public String toString()
{
return String.format("%s@%x[%s]",this.getClass().getSimpleName(),hashCode(),id);
}
}

View File

@ -0,0 +1,163 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions;
import static org.hamcrest.Matchers.*;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension;
import org.junit.Assert;
import org.junit.Test;
public class ExtensionStackTest
{
private static final Logger LOG = Log.getLogger(ExtensionStackTest.class);
@SuppressWarnings("unchecked")
private <T> T assertIsExtension(String msg, Object obj, Class<T> clazz)
{
if (clazz.isAssignableFrom(obj.getClass()))
{
return (T)obj;
}
Assert.assertEquals("Expected " + msg + " class",clazz.getName(),obj.getClass().getName());
return null;
}
private ExtensionStack createExtensionStack()
{
WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
ByteBufferPool bufferPool = new ArrayByteBufferPool();
ExtensionFactory factory = new WebSocketExtensionFactory(policy,bufferPool);
return new ExtensionStack(factory);
}
@Test
public void testStartIdentity() throws Exception
{
ExtensionStack stack = createExtensionStack();
try
{
// 1 extension
List<ExtensionConfig> configs = new ArrayList<>();
configs.add(ExtensionConfig.parse("identity"));
stack.negotiate(configs);
// Setup Listeners
DummyIncomingFrames session = new DummyIncomingFrames("Session");
DummyOutgoingFrames connection = new DummyOutgoingFrames("Connection");
stack.setNextOutgoing(connection);
stack.setNextIncoming(session);
// Start
stack.start();
// Dump
LOG.debug("{}",stack.dump());
// Should be no change to handlers
Extension actualIncomingExtension = assertIsExtension("Incoming",stack.getNextIncoming(),IdentityExtension.class);
Extension actualOutgoingExtension = assertIsExtension("Outgoing",stack.getNextOutgoing(),IdentityExtension.class);
Assert.assertEquals(actualIncomingExtension,actualOutgoingExtension);
}
finally
{
stack.stop();
}
}
@Test
public void testStartIdentityTwice() throws Exception
{
ExtensionStack stack = createExtensionStack();
try
{
// 1 extension
List<ExtensionConfig> configs = new ArrayList<>();
configs.add(ExtensionConfig.parse("identity; id=A"));
configs.add(ExtensionConfig.parse("identity; id=B"));
stack.negotiate(configs);
// Setup Listeners
DummyIncomingFrames session = new DummyIncomingFrames("Session");
DummyOutgoingFrames connection = new DummyOutgoingFrames("Connection");
stack.setNextOutgoing(connection);
stack.setNextIncoming(session);
// Start
stack.start();
// Dump
LOG.debug("{}",stack.dump());
// Should be no change to handlers
IdentityExtension actualIncomingExtension = assertIsExtension("Incoming",stack.getNextIncoming(),IdentityExtension.class);
IdentityExtension actualOutgoingExtension = assertIsExtension("Outgoing",stack.getNextOutgoing(),IdentityExtension.class);
Assert.assertThat("Incoming[identity].id",actualIncomingExtension.getParam("id"),is("A"));
Assert.assertThat("Outgoing[identity].id",actualOutgoingExtension.getParam("id"),is("B"));
}
finally
{
stack.stop();
}
}
@Test
public void testStartNothing() throws Exception
{
ExtensionStack stack = createExtensionStack();
try
{
// intentionally empty
List<ExtensionConfig> configs = new ArrayList<>();
stack.negotiate(configs);
// Setup Listeners
DummyIncomingFrames session = new DummyIncomingFrames("Session");
DummyOutgoingFrames connection = new DummyOutgoingFrames("Connection");
stack.setNextOutgoing(connection);
stack.setNextIncoming(session);
// Start
stack.start();
// Dump
LOG.debug("{}",stack.dump());
// Should be no change to handlers
Assert.assertEquals("Incoming Handler",stack.getNextIncoming(),session);
Assert.assertEquals("Outgoing Handler",stack.getNextOutgoing(),connection);
}
finally
{
stack.stop();
}
}
}

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.websocket.LEVEL=WARN
org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG

View File

@ -47,12 +47,11 @@ import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
@ -362,6 +361,10 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
return false;
}
// Initialize / Negotiate Extensions
ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
extensionStack.negotiate(request.getExtensions());
// Create connection
UpgradeContext context = getActiveUpgradeContext();
LogicalConnection connection = context.getConnection();
@ -372,33 +375,53 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
connection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
connection = wsConnection;
extensionStack.configure(wsConnection.getParser());
extensionStack.configure(wsConnection.getGenerator());
LOG.debug("HttpConnection: {}",http);
LOG.debug("AsyncWebSocketConnection: {}",connection);
}
// Initialize / Negotiate Extensions
// Setup Session
WebSocketSession session = new WebSocketSession(request.getRequestURI(),driver,connection);
session.setPolicy(getPolicy().clonePolicy());
session.setNegotiatedSubprotocol(response.getAcceptedSubProtocol());
session.setNegotiatedExtensionConfigs(request.getExtensions());
session.setExtensionFactory(extensionFactory);
session.setNegotiatedExtensions(extensionStack.getNegotiatedExtensions());
connection.setSession(session);
// Start with default routing.
IncomingFrames incoming = session;
OutgoingFrames outgoing = connection;
// Setup Incoming Routing
connection.setNextIncomingFrames(extensionStack);
extensionStack.setNextIncoming(session);
// if (extensions != null)
// {
// // FIXME connection.configureFromExtensions(extensions);
// }
// Setup Outgoing Routing
session.setOutgoingHandler(extensionStack);
extensionStack.setNextOutgoing(connection);
// configure session for outgoing flows
session.setOutgoingHandler(outgoing);
// configure connection for incoming flows
connection.setNextIncomingFrames(incoming);
// Start Components
try
{
session.start();
}
catch (Exception e)
{
throw new IOException("Unable to start Session",e);
}
try
{
extensionStack.start();
}
catch (Exception e)
{
throw new IOException("Unable to start Extension Stack",e);
}
if (LOG.isDebugEnabled())
{
LOG.debug("{}",extensionStack.dump());
}
// Tell jetty about the new connection
request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);

View File

@ -34,8 +34,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -57,7 +55,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -67,6 +64,7 @@ import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.server.helper.FinishedFuture;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
@ -96,7 +94,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
private final Generator generator;
private final Parser parser;
private final IncomingFramesCapture incomingFrames;
private final WebSocketExtensionFactory extensionRegistry;
private final WebSocketExtensionFactory extensionFactory;
private Socket socket;
private OutputStream out;
@ -108,7 +106,6 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
{ (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF };
private int timeout = 1000;
private AtomicInteger parseCount;
private IncomingFrames incoming = this;
private OutgoingFrames outgoing = this;
public BlockheadClient(URI destWebsocketURI) throws URISyntaxException
@ -134,7 +131,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
this.incomingFrames = new IncomingFramesCapture();
this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
this.extensionFactory = new WebSocketExtensionFactory(policy,bufferPool);
}
public void addExtensions(String xtension)
@ -211,41 +208,30 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
Assert.assertThat("Response Header Connection",respHeader,containsString("Connection: Upgrade\r\n"));
// collect extensions configured in response header
List<Extension> extensions = getExtensions(respHeader);
List<ExtensionConfig> configs = getExtensionConfigs(respHeader);
ExtensionStack extensionStack = new ExtensionStack(this.extensionFactory);
extensionStack.negotiate(configs);
// Start with default routing
incoming = this;
outgoing = this;
extensionStack.setNextIncoming(this);
extensionStack.setNextOutgoing(this);
// Connect extensions
if (extensions != null)
// Configure Parser / Generator
extensionStack.configure(parser);
extensionStack.configure(generator);
// Start Stack
try
{
generator.configureFromExtensions(extensions);
parser.configureFromExtensions(extensions);
Iterator<Extension> extIter;
// Connect outgoings
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;
}
// Connect incomings
Collections.reverse(extensions);
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextIncomingFrames(incoming);
incoming = ext;
}
extensionStack.start();
}
catch (Exception e)
{
throw new IOException("Unable to start Extension Stack");
}
// configure parser
parser.setIncomingFramesHandler(incoming);
parser.setIncomingFramesHandler(extensionStack);
return respHeader;
}
@ -255,14 +241,9 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
out.flush();
}
public List<String> getExtensions()
private List<ExtensionConfig> getExtensionConfigs(String respHeader)
{
return extensions;
}
private List<Extension> getExtensions(String respHeader)
{
List<Extension> extensions = new ArrayList<>();
List<ExtensionConfig> configs = new ArrayList<>();
Pattern expat = Pattern.compile("Sec-WebSocket-Extensions: (.*)\r",Pattern.CASE_INSENSITIVE);
Matcher mat = expat.matcher(respHeader);
@ -273,14 +254,15 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
LOG.debug("Found Extension Response: {}",econf);
ExtensionConfig config = ExtensionConfig.parse(econf);
Extension ext = extensionRegistry.newInstance(config);
if (ext != null)
{
extensions.add(ext);
}
configs.add(config);
offset = mat.end(1);
}
return configs;
}
public List<String> getExtensions()
{
return extensions;
}