Refactoring .core to .common and updating for .api

This commit is contained in:
Joakim Erdfelt 2012-11-05 17:18:44 -07:00
parent c1ce7780a6
commit 053f91c8a0
130 changed files with 1554 additions and 1128 deletions

View File

@ -14,13 +14,19 @@
<bundle-symbolic-name>${project.groupId}.api</bundle-symbolic-name>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.drafts</groupId>
<artifactId>javax.net.websocket-api</artifactId>
<version>0.006.0.EDR-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -21,9 +21,11 @@ package org.eclipse.jetty.websocket.api;
import java.util.List;
import java.util.Map;
import javax.net.websocket.HandshakeRequest;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
public interface UpgradeRequest
public interface UpgradeRequest extends HandshakeRequest
{
public void addExtensions(String... extConfigs);
@ -33,6 +35,7 @@ public interface UpgradeRequest
public String getHeader(String name);
@Override
public Map<String, List<String>> getHeaders();
public String getHost();
@ -43,8 +46,7 @@ public interface UpgradeRequest
public String getOrigin();
public Map<String, String[]> getParameterMap();
@Override
public String getQueryString();
public String getRemoteURI();

View File

@ -23,9 +23,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.net.websocket.HandshakeResponse;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
public interface UpgradeResponse
public interface UpgradeResponse extends HandshakeResponse
{
public void addHeader(String name, String value);

View File

@ -24,6 +24,8 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
/**
* Connection interface for WebSocket protocol <a href="https://tools.ietf.org/html/rfc6455">RFC-6455</a>.
*/
@ -51,42 +53,14 @@ public interface WebSocketConnection
* @see StatusCode
*/
public void close(int statusCode, String reason);
/**
* Is the connection open.
*
* @return true if open
*/
public boolean isOpen();
/**
* Suspend a the incoming read events on the connection.
*
* @return the suspend token suitable for resuming the reading of data on the connection.
*/
SuspendToken suspend();
/**
* Get the address of the remote side.
*
* @return the remote side address
*/
public InetSocketAddress getRemoteAddress();
/**
* Get the address of the local side.
*
* @return the local side address
*/
public InetSocketAddress getLocalAddress();
/**
* Get the Request URI
*
* @return the requested URI
*/
public URI getRequestURI();
/**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection.
*
@ -94,6 +68,20 @@ public interface WebSocketConnection
*/
WebSocketPolicy getPolicy();
/**
* Get the address of the remote side.
*
* @return the remote side address
*/
public InetSocketAddress getRemoteAddress();
/**
* Get the Request URI
*
* @return the requested URI
*/
public URI getRequestURI();
/**
* Get the SubProtocol in use for this connection.
*
@ -101,6 +89,13 @@ public interface WebSocketConnection
*/
String getSubProtocol();
/**
* Is the connection open.
*
* @return true if open
*/
public boolean isOpen();
/**
* Send a single ping messages.
* <p>
@ -108,21 +103,28 @@ public interface WebSocketConnection
* <p>
* Use @OnWebSocketFrame and monitor Pong frames
*/
<C> Future<C> ping(C context, byte payload[]) throws IOException;
Future<SendResult> ping(byte payload[]) throws IOException;
/**
* Suspend a the incoming read events on the connection.
*
* @return the suspend token suitable for resuming the reading of data on the connection.
*/
SuspendToken suspend();
/**
* Send a a binary message.
* <p>
* NIO style with callbacks, allows for concurrent results of the write operation.
*/
<C> Future<C> write(C context, byte buf[], int offset, int len) throws IOException;
Future<SendResult> write(byte buf[], int offset, int len) throws IOException;
/**
* Send a a binary message.
* <p>
* NIO style with callbacks, allows for concurrent results of the write operation.
*/
<C> Future<C> write(C context, ByteBuffer buffer) throws IOException;
Future<SendResult> write(ByteBuffer buffer) throws IOException;
/**
* Send a series of text messages.
@ -130,5 +132,5 @@ public interface WebSocketConnection
* NIO style with callbacks, allows for concurrent results of the entire write operation. (Callback is only called once at the end of processing all of the
* messages)
*/
<C> Future<C> write(C context, String message) throws IOException;
Future<SendResult> write(String message) throws IOException;
}

View File

@ -18,26 +18,34 @@
package org.eclipse.jetty.websocket.api.extensions;
import javax.net.websocket.extensions.FrameHandler;
/**
* Interface for WebSocket Extensions.
* <p>
* That work is performed by the two {@link FrameHandler} implementations for incoming and outgoing frame handling.
*/
public interface Extension
public interface Extension extends javax.net.websocket.extensions.Extension
{
/**
* Create an instance of a Incoming {@link FrameHandler} for working with frames destined for the End User WebSocket Object.
*
* @param the
* incoming {@link FrameHandler} to wrap
* @return the frame handler for incoming frames.
*/
public FrameHandler createIncomingFrameHandler();
@Override
public FrameHandler createIncomingFrameHandler(FrameHandler incoming);
/**
* Create an instance of a Outgoing {@link FrameHandler} for working with frames destined for the Network Bytes Layer.
*
* @param the
* outgoing {@link FrameHandler} to wrap
* @return the frame handler for outgoing frames.
*/
public FrameHandler createOutgoingFrameHandler();
@Override
public FrameHandler createOutgoingFrameHandler(FrameHandler outgoing);
/**
* The active configuration for this extension.
@ -51,5 +59,6 @@ public interface Extension
* <p>
* Also known as the <a href="https://tools.ietf.org/html/rfc6455#section-9.1"><code>extension-token</code> per Section 9.1. Negotiating Extensions</a>.
*/
@Override
public String getName();
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.api.extensions;
import java.util.Map;
import java.util.Set;
/**
@ -35,6 +36,13 @@ public interface ExtensionConfig
public Set<String> getParameterKeys();
/**
* Return parameters in way similar to how {@link javax.net.websocket.extensions.Extension#getParameters()} works.
*
* @return the parameter map
*/
public Map<String, String> getParameters();
public void setParameter(String key, int value);
public void setParameter(String key, String value);

View File

@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
/**
* An immutable websocket frame.
*/
public interface Frame
public interface Frame extends javax.net.websocket.extensions.Frame
{
public static enum Type
{

View File

@ -1,27 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.api.extensions;
/**
* Represents a Handler for a Frame (regardless of direction)
*/
public interface FrameHandler
{
public void handleFrame(Frame frame);
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -32,8 +34,6 @@ import org.eclipse.jetty.websocket.api.WebSocketException;
*/
public class WebSocketBlockingConnection
{
private static final String CONTEXT_BINARY = "BLOCKING_BINARY";
private static final String CONTEXT_TEXT = "BLOCKING_TEXT";
private final WebSocketConnection conn;
public WebSocketBlockingConnection(WebSocketConnection conn)
@ -50,8 +50,12 @@ public class WebSocketBlockingConnection
{
try
{
Future<String> blocker = conn.write(CONTEXT_BINARY,data,offset,length);
blocker.get(); // block till finished
Future<SendResult> blocker = conn.write(data,offset,length);
SendResult result = blocker.get(); // block till finished
if (result.getException() != null)
{
throw new WebSocketException(result.getException());
}
}
catch (InterruptedException e)
{
@ -72,8 +76,12 @@ public class WebSocketBlockingConnection
{
try
{
Future<String> blocker = conn.write(CONTEXT_TEXT,message);
blocker.get(); // block till finished
Future<SendResult> blocker = conn.write(message);
SendResult result = blocker.get(); // block till finished
if (result.getException() != null)
{
throw new WebSocketException(result.getException());
}
}
catch (InterruptedException e)
{

View File

@ -16,6 +16,12 @@
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.drafts</groupId>
<artifactId>javax.net.websocket-api</artifactId>
<version>0.006.0.EDR-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-api</artifactId>

View File

@ -186,7 +186,7 @@ public class Parser
}
try
{
incomingFramesHandler.incoming(f);
incomingFramesHandler.incomingFrame(f);
}
catch (WebSocketException e)
{
@ -206,7 +206,7 @@ public class Parser
{
return;
}
incomingFramesHandler.incoming(e);
incomingFramesHandler.incomingError(e);
}
public synchronized void parse(ByteBuffer buffer)

View File

@ -100,6 +100,13 @@ public class RequestedExtensionConfig implements ExtensionConfig
return parameters.keySet();
}
@Override
public Map<String, String> getParameters()
{
// TODO Auto-generated method stub
return null;
}
/**
* Initialize the parameters on this config from the other configuration.
* @param other the other configuration.

View File

@ -20,6 +20,9 @@ package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import javax.net.websocket.SendHandler;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.ProtocolException;
@ -104,6 +107,14 @@ public class WebSocketFrame implements Frame
private Type type;
private boolean continuation = false;
private int continuationIndex = 0;
/**
* Optional operation.
* <p>
* Hack, should not really be in the WebSocketFrame, send results back to this handler when frame was successfully sent (or not).
* <p>
* Kept in WebSocketFrame as a result of the [006EDR] of JavaWebSocket API lacking the ability to track this SendHandler through Extensions.
*/
private SendHandler sendHandler;
/**
* Default constructor
@ -318,6 +329,11 @@ public class WebSocketFrame implements Frame
return payloadStart;
}
public SendHandler getSendResult()
{
return sendHandler;
}
@Override
public Type getType()
{
@ -329,6 +345,7 @@ public class WebSocketFrame implements Frame
return ((data != null) && (payloadLength > 0));
}
@Override
public boolean isContinuation()
{
return continuation;
@ -385,6 +402,20 @@ public class WebSocketFrame implements Frame
return rsv3;
}
public void notifySendHandler() {
if(sendHandler == null) {
return;
}
sendHandler.setResult(new SendResult(null));
}
public void notifySendHandler(Throwable t) {
if(sendHandler == null) {
return;
}
sendHandler.setResult(new SendResult(t));
}
/**
* Get the position currently within the payload data.
* <p>
@ -582,6 +613,11 @@ public class WebSocketFrame implements Frame
return this;
}
public void setSendHandler(SendHandler handler)
{
this.sendHandler = handler;
}
@Override
public String toString()
{
@ -596,6 +632,10 @@ public class WebSocketFrame implements Frame
b.append(rsv3?'1':'.');
b.append(",masked=").append(masked);
b.append(",continuation=").append(continuation);
if (sendHandler != null)
{
b.append(",sendHandler=").append(sendHandler);
}
b.append(']');
return b.toString();
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.io.IOException;
import java.io.InputStream;
@ -28,11 +28,11 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.io.message.MessageAppender;
import org.eclipse.jetty.websocket.common.io.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.io.message.MessageReader;
import org.eclipse.jetty.websocket.common.io.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.io.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
/**
* Handler for Annotated User WebSocket objects.

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -31,16 +31,17 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
import org.eclipse.jetty.websocket.common.io.IncomingFrames;
import org.eclipse.jetty.websocket.common.io.WebSocketSession;
/**
* EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
*/
public abstract class EventDriver implements FrameHandler
public abstract class EventDriver extends AbstractJettyFrameHandler implements IncomingFrames
{
protected final Logger LOG;
protected final WebSocketPolicy policy;
@ -49,6 +50,7 @@ public abstract class EventDriver implements FrameHandler
public EventDriver(WebSocketPolicy policy, Object websocket)
{
super(null);
this.policy = policy;
this.websocket = websocket;
this.LOG = Log.getLogger(websocket.getClass());
@ -65,7 +67,30 @@ public abstract class EventDriver implements FrameHandler
}
@Override
public void handleFrame(Frame frame)
public void handleJettyFrame(WebSocketFrame frame)
{
incomingFrame(frame);
}
@Override
public final void incomingError(WebSocketException e)
{
if (LOG.isDebugEnabled())
{
LOG.debug("incoming(WebSocketException)",e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
}
onException(e);
}
@Override
public void incomingFrame(WebSocketFrame frame)
{
if (LOG.isDebugEnabled())
{
@ -110,7 +135,7 @@ public abstract class EventDriver implements FrameHandler
LOG.debug("Pong with {}",BufferUtil.toDetailString(pongBuf));
}
}
session.output("pong",pong);
session.outgoingFrame(pong);
break;
}
case OpCode.BINARY:
@ -139,23 +164,6 @@ public abstract class EventDriver implements FrameHandler
}
}
// FIXME: need flow of exceptions ...
public final void incoming(WebSocketException e)
{
if (LOG.isDebugEnabled())
{
LOG.debug("incoming(WebSocketException)",e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
}
onException(e);
}
public abstract void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException;
public abstract void onBinaryMessage(byte[] data);

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.io.InputStream;
import java.io.Reader;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.io.InputStream;
import java.io.Reader;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
/**
* A representation of the methods available to call for a particular class.

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -26,9 +26,9 @@ import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.io.message.MessageAppender;
import org.eclipse.jetty.websocket.common.io.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.io.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
/**
* Handler for {@link WebSocketListener} based User WebSocket implementations.

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.event;
package org.eclipse.jetty.websocket.common.events;
import java.util.ArrayList;

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.common.extensions;
import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -53,6 +55,12 @@ public abstract class AbstractExtension implements Extension
return config.getName();
}
@Override
public Map<String, String> getParameters()
{
return config.getParameters();
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -129,4 +137,10 @@ public abstract class AbstractExtension implements Extension
{
this.policy = policy;
}
@Override
public String toString()
{
return String.format("%s[%s]",config.getName(),config.getParameterizedName());
}
}

View File

@ -0,0 +1,75 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public abstract class AbstractJettyFrameHandler extends FrameHandler
{
private Logger log;
public AbstractJettyFrameHandler(FrameHandler nextHandler)
{
super(nextHandler);
log = Log.getLogger(this.getClass());
}
/**
* Incoming for javax.net.websocket based frames.
* <p>
* Note: As of 006EDR of the JavaWebSocket API is, the {@link javax.net.websocket.extensions.Frame} interface is insufficient to handle the intricacies of
* Extensions. The implementations of {@link javax.net.websocket.extensions.Extension} within Jetty will attempt to cast to
* {@link org.eclipse.jetty.websocket.common.WebSocketFrame} to handle only jetty managed/created WebSocketFrames.
*/
@Override
public final void handleFrame(javax.net.websocket.extensions.Frame f)
{
if (f instanceof WebSocketFrame)
{
handleJettyFrame((WebSocketFrame)f);
}
else
{
// [006EDR]
throw new RuntimeException("Unsupported, Frame type [" + f.getClass().getName() + "] must be an instanceof [" + WebSocketFrame.class.getName()
+ "]");
}
}
public abstract void handleJettyFrame(WebSocketFrame frame);
protected void nextJettyHandler(WebSocketFrame frame)
{
FrameHandler next = getNextHandler();
if (next == null)
{
log.debug("No next handler (ending chain) {}",frame);
return;
}
if (log.isDebugEnabled())
{
log.debug("nextHandler({}) -> {}",frame,next);
}
next.handleFrame(frame);
}
}

View File

@ -18,41 +18,36 @@
package org.eclipse.jetty.websocket.common.extensions;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.FrameHandler;
import org.eclipse.jetty.websocket.api.extensions.FrameHandlerWrapper;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.io.IncomingFrames;
public abstract class FrameHandlerAdapter implements FrameHandlerWrapper
/**
* Route jetty internal frames into extension {@link FrameHandler} concept.
*/
public class IncomingFrameHandler implements IncomingFrames
{
private Logger log;
private FrameHandler nextHandler;
private static final Logger LOG = Log.getLogger(IncomingFrameHandler.class);
private FrameHandler extHandler;
public FrameHandlerAdapter()
public IncomingFrameHandler(FrameHandler nextHandler)
{
log = Log.getLogger(this.getClass());
this.extHandler = nextHandler;
}
@Override
public FrameHandler getNextHandler()
public void incomingError(WebSocketException e)
{
return nextHandler;
}
protected void nextHandler(Frame frame)
{
if (log.isDebugEnabled())
{
log.debug("nextHandler({}) -> {}",frame,nextHandler);
}
nextHandler.handleFrame(frame);
LOG.info("Not able to forward error into extension stack",e);
}
@Override
public void setNextHandler(FrameHandler handler)
public void incomingFrame(WebSocketFrame frame)
{
this.nextHandler = handler;
extHandler.handleFrame(frame);
}
}

View File

@ -0,0 +1,81 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
/**
* Compress individual data frames.
*/
public class CompressFrameHandler extends AbstractJettyFrameHandler
{
private final CompressionMethod method;
private final ByteBufferPool bufferPool;
public CompressFrameHandler(FrameHandler outgoing, DeflateCompressionMethod method, ByteBufferPool bufferPool)
{
super(outgoing);
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
if (frame.getType().isControl())
{
// skip, cannot compress control frames.
nextJettyHandler(frame);
return;
}
ByteBuffer data = frame.getPayload();
try
{
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
}
nextJettyHandler(out);
}
// reset on every frame.
method.compress().end();
}
finally
{
// free original data buffer
bufferPool.release(data);
}
}
}

View File

@ -0,0 +1,88 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
/**
* Compress entire message (regardless of number of frames).
*/
public class CompressMessageHandler extends AbstractJettyFrameHandler
{
private final CompressionMethod method;
private final ByteBufferPool bufferPool;
public CompressMessageHandler(FrameHandler nextHandler, CompressionMethod method, ByteBufferPool bufferPool)
{
super(nextHandler);
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
if (frame.getType().isControl())
{
// skip, cannot compress control frames.
nextJettyHandler(frame);
return;
}
ByteBuffer data = frame.getPayload();
try
{
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
nextJettyHandler(out);
}
else
{
nextJettyHandler(out);
}
}
// reset only at end of message
if (frame.isFin())
{
method.compress().end();
}
}
finally
{
// free original data buffer
bufferPool.release(data);
}
}
}

View File

@ -0,0 +1,83 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
/**
* Handler for Decompress of individual Data frames.
*/
public class DecompressFrameHandler extends AbstractJettyFrameHandler
{
private static final Logger LOG = Log.getLogger(DecompressFrameHandler.class);
private final CompressionMethod method;
private final ByteBufferPool bufferPool;
public DecompressFrameHandler(FrameHandler incoming, DeflateCompressionMethod method, ByteBufferPool bufferPool)
{
super(incoming);
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
if (frame.getType().isControl() || !frame.isRsv1())
{
// Cannot modify incoming control frames or ones with RSV1 set.
nextJettyHandler(frame);
return;
}
LOG.debug("Decompressing Frame: {}",frame);
ByteBuffer data = frame.getPayload();
try
{
method.decompress().input(data);
while (!method.decompress().isDone())
{
ByteBuffer uncompressed = method.decompress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
out.setFin(false);
}
nextJettyHandler(out);
}
// reset on every frame.
method.decompress().end();
}
finally
{
// release original buffer (no longer needed)
bufferPool.release(data);
}
}
}

View File

@ -0,0 +1,85 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
/**
* Decompress entire message (regardless of number of frames)
*/
public class DecompressMessageHandler extends AbstractJettyFrameHandler
{
private CompressionMethod method;
private final ByteBufferPool bufferPool;
public DecompressMessageHandler(FrameHandler nextHandler, CompressionMethod method, ByteBufferPool bufferPool)
{
super(nextHandler);
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
if (frame.getType().isControl() || !frame.isRsv1())
{
// Cannot modify incoming control frames or ones with RSV1 set.
nextJettyHandler(frame);
return;
}
ByteBuffer data = frame.getPayload();
try
{
method.decompress().input(data);
while (!method.decompress().isDone())
{
ByteBuffer uncompressed = method.decompress().process();
if (uncompressed == null)
{
continue;
}
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
out.setFin(false);
}
nextJettyHandler(out);
}
// reset only at the end of a message.
if (frame.isFin())
{
method.decompress().end();
}
}
finally
{
// release original buffer (no longer needed)
bufferPool.release(data);
}
}
}

View File

@ -18,17 +18,11 @@
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
import org.eclipse.jetty.websocket.common.extensions.FrameHandlerAdapter;
/**
* Per Message Compression extension for WebSocket.
@ -37,130 +31,20 @@ import org.eclipse.jetty.websocket.common.extensions.FrameHandlerAdapter;
*/
public class PerMessageCompressionExtension extends AbstractExtension
{
private static class CompressMessageHandler extends FrameHandlerAdapter
{
private final CompressionMethod method;
private final ByteBufferPool bufferPool;
public CompressMessageHandler(CompressionMethod method, ByteBufferPool bufferPool)
{
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleFrame(Frame frame)
{
if (frame instanceof Frame.Control)
{
// skip, cannot compress control frames.
nextHandler(frame);
return;
}
ByteBuffer data = frame.getPayload();
try
{
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
nextHandler(out);
}
else
{
nextHandler(out);
}
}
// reset only at end of message
if (frame.isFin())
{
method.compress().end();
}
}
finally
{
// free original data buffer
bufferPool.release(data);
}
}
}
private static class DecompressMessageHandler extends FrameHandlerAdapter
{
private CompressionMethod method;
private final ByteBufferPool bufferPool;
public DecompressMessageHandler(CompressionMethod method, ByteBufferPool bufferPool)
{
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleFrame(Frame frame)
{
if ((frame instanceof Frame.Control) || !frame.isRsv1())
{
// Cannot modify incoming control frames or ones with RSV1 set.
nextHandler(frame);
return;
}
ByteBuffer data = frame.getPayload();
try
{
method.decompress().input(data);
while (!method.decompress().isDone())
{
ByteBuffer uncompressed = method.decompress().process();
if (uncompressed == null)
{
continue;
}
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
out.setFin(false);
}
nextHandler(out);
}
// reset only at the end of a message.
if (frame.isFin())
{
method.decompress().end();
}
}
finally
{
// release original buffer (no longer needed)
bufferPool.release(data);
}
}
}
private static final Logger LOG = Log.getLogger(PerMessageCompressionExtension.class);
private CompressionMethod method;
@Override
public FrameHandler createIncomingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createIncomingFrameHandler(javax.net.websocket.extensions.FrameHandler incoming)
{
return new DecompressMessageHandler(method,getBufferPool());
return new DecompressMessageHandler(incoming,method,getBufferPool());
}
@Override
public FrameHandler createOutgoingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createOutgoingFrameHandler(javax.net.websocket.extensions.FrameHandler outgoing)
{
return new CompressMessageHandler(method,getBufferPool());
return new CompressMessageHandler(outgoing,method,getBufferPool());
}
/**

View File

@ -18,17 +18,9 @@
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
import org.eclipse.jetty.websocket.common.extensions.FrameHandlerAdapter;
/**
* Implementation of the <a href="https://tools.ietf.org/id/draft-tyoshino-hybi-websocket-perframe-deflate-05.txt">x-webkit-deflate-frame</a> extension seen out
@ -36,120 +28,18 @@ import org.eclipse.jetty.websocket.common.extensions.FrameHandlerAdapter;
*/
public class WebkitDeflateFrameExtension extends AbstractExtension
{
private static class CompressFrameHandler extends FrameHandlerAdapter
{
private final CompressionMethod method;
private final ByteBufferPool bufferPool;
public CompressFrameHandler(DeflateCompressionMethod method, ByteBufferPool bufferPool)
{
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleFrame(Frame frame)
{
if (frame instanceof Frame.Control)
{
// skip, cannot compress control frames.
nextHandler(frame);
return;
}
ByteBuffer data = frame.getPayload();
try
{
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
}
nextHandler(out);
}
// reset on every frame.
method.compress().end();
}
finally
{
// free original data buffer
bufferPool.release(data);
}
}
}
private static class DecompressFrameHandler extends FrameHandlerAdapter
{
private final CompressionMethod method;
private final ByteBufferPool bufferPool;
public DecompressFrameHandler(DeflateCompressionMethod method, ByteBufferPool bufferPool)
{
this.method = method;
this.bufferPool = bufferPool;
}
@Override
public void handleFrame(Frame frame)
{
if ((frame instanceof Frame.Control) || !frame.isRsv1())
{
// Cannot modify incoming control frames or ones with RSV1 set.
nextHandler(frame);
return;
}
LOG.debug("Decompressing Frame: {}",frame);
ByteBuffer data = frame.getPayload();
try
{
method.decompress().input(data);
while (!method.decompress().isDone())
{
ByteBuffer uncompressed = method.decompress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
out.setFin(false);
}
nextHandler(out);
}
// reset on every frame.
method.decompress().end();
}
finally
{
// release original buffer (no longer needed)
bufferPool.release(data);
}
}
}
private static final Logger LOG = Log.getLogger(WebkitDeflateFrameExtension.class);
private DeflateCompressionMethod method;
@Override
public FrameHandler createIncomingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createIncomingFrameHandler(javax.net.websocket.extensions.FrameHandler incoming)
{
return new DecompressFrameHandler(method,getBufferPool());
return new DecompressFrameHandler(incoming,method,getBufferPool());
}
@Override
public FrameHandler createOutgoingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createOutgoingFrameHandler(javax.net.websocket.extensions.FrameHandler outgoing)
{
return new CompressFrameHandler(method,getBufferPool());
return new CompressFrameHandler(outgoing,method,getBufferPool());
}
/**

View File

@ -18,93 +18,27 @@
package org.eclipse.jetty.websocket.common.extensions.fragment;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
import org.eclipse.jetty.websocket.common.extensions.FrameHandlerAdapter;
/**
* Fragment Extension
*/
public class FragmentExtension extends AbstractExtension
{
/**
* Handler to break apart the frames into multiple smaller frames.
*/
private class FragmentHandler extends FrameHandlerAdapter
{
@Override
public void handleFrame(Frame frame)
{
if (frame instanceof Frame.Control)
{
// Cannot fragment Control Frames
nextHandler(frame);
return;
}
int length = frame.getPayloadLength();
byte opcode = frame.getOpCode(); // original opcode
ByteBuffer payload = frame.getPayload().slice();
int originalLimit = payload.limit();
int currentPosition = payload.position();
if (maxLength <= 0)
{
// output original frame
nextHandler(frame);
return;
}
boolean continuation = false;
// break apart payload based on maxLength rules
while (length > maxLength)
{
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
frag.setFin(false); // always false here
frag.setContinuation(continuation);
payload.position(currentPosition);
payload.limit(Math.min(payload.position() + maxLength,originalLimit));
frag.setPayload(payload);
nextHandler(frag);
length -= maxLength;
opcode = OpCode.CONTINUATION;
continuation = true;
currentPosition = payload.limit();
}
// write remaining
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
frag.setFin(frame.isFin()); // use original fin
frag.setContinuation(continuation);
payload.position(currentPosition);
payload.limit(originalLimit);
frag.setPayload(payload);
nextHandler(frag);
}
}
private int maxLength = -1;
@Override
public FrameHandler createIncomingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createIncomingFrameHandler(javax.net.websocket.extensions.FrameHandler incoming)
{
return new FragmentHandler();
return new FragmentHandler(incoming,maxLength);
}
@Override
public FrameHandler createOutgoingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createOutgoingFrameHandler(javax.net.websocket.extensions.FrameHandler outgoing)
{
return new FragmentHandler();
return new FragmentHandler(outgoing,maxLength);
}
@Override

View File

@ -0,0 +1,83 @@
package org.eclipse.jetty.websocket.common.extensions.fragment;
import java.nio.ByteBuffer;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
/**
* Handler to break apart the frames into multiple smaller frames.
*/
public class FragmentHandler extends AbstractJettyFrameHandler
{
private final int maxLength;
/**
* @param fragmentExtension
*/
public FragmentHandler(FrameHandler nextHandler, int maxLength)
{
super(nextHandler);
this.maxLength = maxLength;
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
if (frame.getType().isControl())
{
// Cannot fragment Control Frames
nextJettyHandler(frame);
return;
}
int length = frame.getPayloadLength();
byte opcode = frame.getType().getOpCode(); // original opcode
ByteBuffer payload = frame.getPayload().slice();
int originalLimit = payload.limit();
int currentPosition = payload.position();
if (maxLength <= 0)
{
// output original frame
nextJettyHandler(frame);
return;
}
boolean continuation = false;
// break apart payload based on maxLength rules
while (length > maxLength)
{
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
frag.setFin(false); // always false here
frag.setContinuation(continuation);
payload.position(currentPosition);
payload.limit(Math.min(payload.position() + maxLength,originalLimit));
frag.setPayload(payload);
nextJettyHandler(frag);
length -= maxLength;
opcode = OpCode.CONTINUATION;
continuation = true;
currentPosition = payload.limit();
}
// write remaining
WebSocketFrame frag = new WebSocketFrame(frame);
frag.setOpCode(opcode);
frag.setFin(frame.isFin()); // use original fin
frag.setContinuation(continuation);
payload.position(currentPosition);
payload.limit(originalLimit);
frag.setPayload(payload);
nextJettyHandler(frag);
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.common.extensions.identity;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
public class IdentityExtension extends AbstractExtension
@ -28,15 +27,15 @@ public class IdentityExtension extends AbstractExtension
private String id;
@Override
public FrameHandler createIncomingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createIncomingFrameHandler(javax.net.websocket.extensions.FrameHandler incoming)
{
return new IdentityFrameHandler();
return new IdentityFrameHandler(incoming);
}
@Override
public FrameHandler createOutgoingFrameHandler()
public javax.net.websocket.extensions.FrameHandler createOutgoingFrameHandler(javax.net.websocket.extensions.FrameHandler outgoing)
{
return new IdentityFrameHandler();
return new IdentityFrameHandler(outgoing);
}
@Override

View File

@ -18,17 +18,24 @@
package org.eclipse.jetty.websocket.common.extensions.identity;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.extensions.FrameHandlerAdapter;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
/**
* FrameHandler that just passes frames through with no modification.
*/
public class IdentityFrameHandler extends FrameHandlerAdapter
public class IdentityFrameHandler extends AbstractJettyFrameHandler
{
@Override
public void handleFrame(Frame frame)
public IdentityFrameHandler(FrameHandler nextHandler)
{
nextHandler(frame);
super(nextHandler);
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
nextJettyHandler(frame);
}
}

View File

@ -18,10 +18,9 @@
package org.eclipse.jetty.websocket.common.extensions.mux;
import org.eclipse.jetty.util.Callback;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
/**
@ -41,21 +40,15 @@ public abstract class AbstractMuxExtension extends AbstractExtension
public abstract void configureMuxer(Muxer muxer);
@Override
public void incoming(WebSocketException e)
public FrameHandler createIncomingFrameHandler(FrameHandler incoming)
{
muxer.incoming(e);
return new MuxerIncomingFrameHandler(incoming,muxer);
}
@Override
public void incoming(WebSocketFrame frame)
public FrameHandler createOutgoingFrameHandler(FrameHandler outgoing)
{
muxer.incoming(frame);
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws java.io.IOException
{
nextOutput(context,callback,frame);
return new MuxerOutgoingFrameHandler(outgoing,muxer);
}
@Override
@ -66,7 +59,7 @@ public abstract class AbstractMuxExtension extends AbstractExtension
{
throw new RuntimeException("Cannot reset muxer physical connection once established");
}
this.muxer = new Muxer(connection,this);
this.muxer = new Muxer(connection);
configureMuxer(this.muxer);
}
}

View File

@ -20,14 +20,14 @@ package org.eclipse.jetty.websocket.common.extensions.mux;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -87,7 +87,7 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
CloseInfo close = new CloseInfo(statusCode,reason);
try
{
output("<close>",new FutureCallback<>(),close.asFrame());
outgoingFrame(close.asFrame());
}
catch (IOException e)
{
@ -114,6 +114,13 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
return channelId;
}
@Override
public InetSocketAddress getLocalAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public WebSocketPolicy getPolicy()
{
@ -126,6 +133,13 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
return muxer.getRemoteAddress();
}
@Override
public URI getRequestURI()
{
// TODO Auto-generated method stub
return null;
}
public WebSocketSession getSession()
{
return session;
@ -147,18 +161,18 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
* Incoming exceptions from Muxer.
*/
@Override
public void incoming(WebSocketException e)
public void incomingError(WebSocketException e)
{
incoming.incoming(e);
incoming.incomingError(e);
}
/**
* Incoming frames from Muxer
*/
@Override
public void incoming(WebSocketFrame frame)
public void incomingFrame(WebSocketFrame frame)
{
incoming.incoming(frame);
incoming.incomingFrame(frame);
}
public boolean isActive()
@ -235,18 +249,18 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
* Frames destined for the Muxer
*/
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
public Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException
{
muxer.output(context,callback,channelId,frame);
return muxer.output(channelId,frame);
}
/**
* Ping frame destined for the Muxer
*/
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
public Future<SendResult> ping(byte[] payload) throws IOException
{
output(context,callback,WebSocketFrame.ping().setPayload(payload));
return outgoingFrame(WebSocketFrame.ping().setPayload(payload));
}
@Override
@ -292,25 +306,26 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
if (extensions != null)
{
Iterator<AbstractExtension> extIter;
// Connect outgoings
extIter = extensions.iterator();
while (extIter.hasNext())
{
AbstractExtension ext = extIter.next();
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;
}
// Connect incomings
Collections.reverse(extensions);
extIter = extensions.iterator();
while (extIter.hasNext())
{
AbstractExtension ext = extIter.next();
ext.setNextIncomingFrames(incoming);
incoming = ext;
}
// FIXME
// Iterator<AbstractExtension> extIter;
// // Connect outgoings
// extIter = extensions.iterator();
// while (extIter.hasNext())
// {
// AbstractExtension ext = extIter.next();
// ext.setNextOutgoingFrames(outgoing);
// outgoing = ext;
// }
//
// // Connect incomings
// Collections.reverse(extensions);
// extIter = extensions.iterator();
// while (extIter.hasNext())
// {
// AbstractExtension ext = extIter.next();
// ext.setNextIncomingFrames(incoming);
// incoming = ext;
// }
}
// set outgoing
@ -321,26 +336,26 @@ public class MuxChannel implements WebSocketConnection, InternalConnection, Inco
* Generate a binary message, destined for Muxer
*/
@Override
public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
public Future<SendResult> write(byte[] buf, int offset, int len) throws IOException
{
output(context,callback,WebSocketFrame.binary().setPayload(buf,offset,len));
return outgoingFrame(WebSocketFrame.binary().setPayload(buf,offset,len));
}
/**
* Generate a binary message, destined for Muxer
*/
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
public Future<SendResult> write(ByteBuffer buffer) throws IOException
{
output(context,callback,WebSocketFrame.binary().setPayload(buffer));
return outgoingFrame(WebSocketFrame.binary().setPayload(buffer));
}
/**
* Generate a text message, destined for Muxer
*/
@Override
public <C> void write(C context, Callback<C> callback, String message) throws IOException
public Future<SendResult> write(String message) throws IOException
{
output(context,callback,WebSocketFrame.text(message));
return outgoingFrame(WebSocketFrame.text(message));
}
}

View File

@ -20,12 +20,13 @@ package org.eclipse.jetty.websocket.common.extensions.mux;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelRequest;
import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelResponse;
@ -55,9 +56,32 @@ public class MuxGenerator
this.bufferPool = bufferPool;
}
public void generate(long channelId, WebSocketFrame frame) throws IOException
public Future<SendResult> generate(long channelId, WebSocketFrame frame) throws IOException
{
output(null, new FutureCallback<>(), channelId, frame);
ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
BufferUtil.flipToFill(muxPayload);
// start building mux payload
writeChannelId(muxPayload,channelId);
byte b = (byte)(frame.isFin()?0x80:0x00); // fin
b |= (byte)(frame.isRsv1()?0x40:0x00); // rsv1
b |= (byte)(frame.isRsv2()?0x20:0x00); // rsv2
b |= (byte)(frame.isRsv3()?0x10:0x00); // rsv3
b |= (byte)(frame.getOpCode() & 0x0F); // opcode
muxPayload.put(b);
BufferUtil.put(frame.getPayload(),muxPayload);
// build muxed frame
WebSocketFrame muxFrame = WebSocketFrame.binary();
BufferUtil.flipToFlush(muxPayload,0);
muxFrame.setPayload(muxPayload);
// NOTE: the physical connection will handle masking rules for this frame.
// release original buffer (no longer needed)
bufferPool.release(frame.getPayload());
// send muxed frame down to the physical connection.
return outgoing.outgoingFrame(muxFrame);
}
public void generate(MuxControlBlock... blocks) throws IOException
@ -143,7 +167,7 @@ public class MuxGenerator
BufferUtil.flipToFlush(payload,0);
WebSocketFrame frame = WebSocketFrame.binary();
frame.setPayload(payload);
outgoing.output(null,new FutureCallback<>(),frame);
outgoing.outgoingFrame(frame);
}
public OutgoingFrames getOutgoing()
@ -151,34 +175,6 @@ public class MuxGenerator
return outgoing;
}
public <C> void output(C context, Callback<C> callback, long channelId, WebSocketFrame frame) throws IOException
{
ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
BufferUtil.flipToFill(muxPayload);
// start building mux payload
writeChannelId(muxPayload,channelId);
byte b = (byte)(frame.isFin()?0x80:0x00); // fin
b |= (byte)(frame.isRsv1()?0x40:0x00); // rsv1
b |= (byte)(frame.isRsv2()?0x20:0x00); // rsv2
b |= (byte)(frame.isRsv3()?0x10:0x00); // rsv3
b |= (byte)(frame.getOpCode() & 0x0F); // opcode
muxPayload.put(b);
BufferUtil.put(frame.getPayload(),muxPayload);
// build muxed frame
WebSocketFrame muxFrame = WebSocketFrame.binary();
BufferUtil.flipToFlush(muxPayload,0);
muxFrame.setPayload(muxPayload);
// NOTE: the physical connection will handle masking rules for this frame.
// release original buffer (no longer needed)
bufferPool.release(frame.getPayload());
// send muxed frame down to the physical connection.
outgoing.output(context,callback,muxFrame);
}
public void setOutgoing(OutgoingFrames outgoing)
{
this.outgoing = outgoing;

View File

@ -18,13 +18,16 @@
package org.eclipse.jetty.websocket.common.extensions.mux;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.RequestedExtensionConfig;
public class MuxRequest implements UpgradeRequest
@ -64,15 +67,13 @@ public class MuxRequest implements UpgradeRequest
private String queryString;
private List<String> subProtocols;
private Map<String, String> cookies;
private List<RequestedExtensionConfig> extensions;
private List<ExtensionConfig> extensions;
private Map<String, List<String>> headers;
private Map<String, String[]> parameterMap;
public MuxRequest()
{
// TODO Auto-generated constructor stub
}
public MuxRequest(UpgradeRequest copy)
{
// TODO Auto-generated constructor stub
@ -94,7 +95,7 @@ public class MuxRequest implements UpgradeRequest
}
@Override
public List<RequestedExtensionConfig> getExtensions()
public List<ExtensionConfig> getExtensions()
{
return extensions;
}
@ -176,12 +177,33 @@ public class MuxRequest implements UpgradeRequest
return remoteURI;
}
@Override
public URI getRequestURI()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Object getSession()
{
// TODO Auto-generated method stub
return null;
}
@Override
public List<String> getSubProtocols()
{
return subProtocols;
}
@Override
public Principal getUserPrincipal()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean hasSubProtocol(String test)
{
@ -201,6 +223,13 @@ public class MuxRequest implements UpgradeRequest
return test.equalsIgnoreCase(getOrigin());
}
@Override
public boolean isUserInRole(String role)
{
// TODO Auto-generated method stub
return false;
}
@Override
public void setSubProtocols(String protocols)
{

View File

@ -21,11 +21,12 @@ package org.eclipse.jetty.websocket.common.extensions.mux;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.RequestedExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
public class MuxResponse implements UpgradeResponse
{
@ -43,7 +44,7 @@ public class MuxResponse implements UpgradeResponse
}
@Override
public List<RequestedExtensionConfig> getExtensions()
public List<ExtensionConfig> getExtensions()
{
// TODO Auto-generated method stub
return null;
@ -56,6 +57,13 @@ public class MuxResponse implements UpgradeResponse
return null;
}
@Override
public Map<String, List<String>> getHeaders()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHeaderValue(String name)
{
@ -106,7 +114,7 @@ public class MuxResponse implements UpgradeResponse
}
@Override
public void setExtensions(List<RequestedExtensionConfig> extensions)
public void setExtensions(List<ExtensionConfig> extensions)
{
// TODO Auto-generated method stub

View File

@ -23,8 +23,10 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -78,14 +80,13 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
/** The original response headers, used for delta encoded AddChannelResponse blocks */
private List<String> physicalResponseHeaders;
public Muxer(final WebSocketConnection connection, final OutgoingFrames outgoing)
public Muxer(final WebSocketConnection connection)
{
this.physicalConnection = connection;
this.policy = connection.getPolicy().clonePolicy();
this.parser = new MuxParser();
this.parser.setEvents(this);
this.generator = new MuxGenerator();
this.generator.setOutgoing(outgoing);
}
public MuxAddClient getAddClient()
@ -137,19 +138,21 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
}
/**
* Incoming exceptions encountered during parsing of mux encapsulated frames.
* Incoming parser errors
*/
@Override
public void incoming(WebSocketException e)
public void incomingError(WebSocketException e)
{
// TODO Notify Control Channel 0
MuxDropChannel.Reason reason = MuxDropChannel.Reason.PHYSICAL_CONNECTION_FAILED;
String phrase = String.format("%s: %s", e.getClass().getName(), e.getMessage());
mustFailPhysicalConnection(new MuxPhysicalConnectionException(reason,phrase));
}
/**
* Incoming mux encapsulated frames.
*/
@Override
public void incoming(WebSocketFrame frame)
public void incomingFrame(WebSocketFrame frame)
{
parser.parse(frame);
}
@ -242,11 +245,14 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
}
default:
{
// TODO: ERROR
break;
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unrecognized request encoding");
}
}
}
catch (MuxPhysicalConnectionException e)
{
throw e;
}
catch (Throwable t)
{
throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unable to parse request",t);
@ -315,7 +321,7 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
public void onMuxedFrame(MuxedFrame frame)
{
MuxChannel subchannel = channels.get(frame.getChannelId());
subchannel.incoming(frame);
subchannel.incomingFrame(frame);
}
@Override
@ -327,7 +333,7 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
}
LOG.warn(e);
// TODO: handle other mux exceptions?
// TODO: handle other (non physical) mux exceptions how?
}
/**
@ -377,13 +383,13 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
/**
* Outgoing frame, without mux encapsulated payload.
*/
public <C> void output(C context, Callback<C> callback, long channelId, WebSocketFrame frame) throws IOException
public Future<SendResult> output(long channelId, WebSocketFrame frame) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("output({}, {}, {}, {})",context,callback,channelId,frame);
LOG.debug("output({}, {})",channelId,frame);
}
generator.output(context,callback,channelId,frame);
return generator.generate(channelId,frame);
}
/**
@ -408,6 +414,11 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
this.addServer = addServer;
}
public void setOutgoingFramesHandler(OutgoingFrames outgoing)
{
this.generator.setOutgoing(outgoing);
}
/**
* Set the remote address of the physical connection.
* <p>

View File

@ -16,14 +16,30 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.api.extensions;
package org.eclipse.jetty.websocket.common.extensions.mux;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
import org.eclipse.jetty.websocket.common.io.IncomingFrames;
/**
* Represents a Handler that processes and delegates results to a child {@link FrameHandler}
* Process incoming frames and forward them off to the Muxer.
*/
public interface FrameHandlerWrapper extends FrameHandler
public class MuxerIncomingFrameHandler extends AbstractJettyFrameHandler
{
public FrameHandler getNextHandler();
private final IncomingFrames muxerHandler;
public void setNextHandler(FrameHandler handler);
public MuxerIncomingFrameHandler(FrameHandler nextHandler, Muxer muxer)
{
super(nextHandler);
this.muxerHandler = muxer;
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
this.muxerHandler.incomingFrame(frame);
}
}

View File

@ -0,0 +1,57 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.extensions.mux;
import java.io.IOException;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import javax.net.websocket.extensions.FrameHandler;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
import org.eclipse.jetty.websocket.common.io.OutgoingFrames;
/**
* Process frames destined for the network layer.
* <p>
* Outgoing frames are not mangled by this handler, as they have already been captured by the {@link Muxer} and encapsulated.
*/
public class MuxerOutgoingFrameHandler extends AbstractJettyFrameHandler implements OutgoingFrames
{
public MuxerOutgoingFrameHandler(FrameHandler nextHandler, Muxer muxer)
{
super(nextHandler);
muxer.setOutgoingFramesHandler(this);
}
@Override
public void handleJettyFrame(WebSocketFrame frame)
{
// pass through. Muxer already encapsulated frame.
nextJettyHandler(frame);
}
@Override
public Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException
{
// TODO Auto-generated method stub
return null;
}
}

View File

@ -28,6 +28,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -41,13 +43,13 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.RequestedExtensionConfig;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
@ -101,7 +103,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
enqueClose(statusCode,reason);
}
public <C> void complete(FrameBytes<C> frameBytes)
public <C> void complete(FrameBytes frameBytes)
{
synchronized (queue)
{
@ -146,12 +148,21 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private void enqueClose(int statusCode, String reason)
{
CloseInfo close = new CloseInfo(statusCode,reason);
output(null,close.asFrame());
try
{
outgoingFrame(close.asFrame());
}
catch (IOException e)
{
LOG.info("Unable to enque close frame",e);
// TODO: now what?
disconnect();
}
}
public void flush()
{
FrameBytes<?> frameBytes = null;
FrameBytes frameBytes = null;
ByteBuffer buffer = null;
synchronized (queue)
{
@ -363,26 +374,30 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Enqueue internal frame from {@link OutgoingFrames} stack for eventual write out on the physical connection.
*/
@Override
public <C> Future<C> output(C context, Frame frame)
public Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("output({}, {})",context,frame);
LOG.debug("output({})",frame);
}
Future<SendResult> future = null;
synchronized (queue)
{
FrameBytes<C> bytes = null;
FrameBytes bytes = null;
if (frame.getType().isControl())
{
bytes = new ControlFrameBytes<C>(this,context,frame);
bytes = new ControlFrameBytes(this,frame);
}
else
{
bytes = new DataFrameBytes<C>(this,context,frame);
bytes = new DataFrameBytes(this,frame);
}
future = bytes;
scheduleTimeout(bytes);
if (isOpen())
@ -397,7 +412,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
}
flush();
return future;
}
private int read(ByteBuffer buffer)
@ -450,7 +468,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
private <C> void scheduleTimeout(FrameBytes<C> bytes)
private void scheduleTimeout(FrameBytes bytes)
{
if (policy.getIdleTimeout() > 0)
{
@ -490,7 +508,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
}
private <C> void write(ByteBuffer buffer, FrameBytes<C> frameBytes)
private <C> void write(ByteBuffer buffer, FrameBytes frameBytes)
{
EndPoint endpoint = getEndPoint();
@ -507,11 +525,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
try
{
endpoint.write(frameBytes.context,frameBytes,buffer);
endpoint.write(null,frameBytes,buffer);
}
catch (Throwable t)
{
frameBytes.failed(frameBytes.context,t);
frameBytes.failed(null,t);
}
}
}

View File

@ -20,26 +20,28 @@ package org.eclipse.jetty.websocket.common.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public class ControlFrameBytes<C> extends FrameBytes<C>
public class ControlFrameBytes extends FrameBytes
{
private static final Logger LOG = Log.getLogger(ControlFrameBytes.class);
private ByteBuffer buffer;
private ByteBuffer origPayload;
public ControlFrameBytes(AbstractWebSocketConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
public ControlFrameBytes(AbstractWebSocketConnection connection, WebSocketFrame frame)
{
super(connection,callback,context,frame);
super(connection,frame);
}
@Override
public void completed(C context) {
public void completed(SendResult context)
{
LOG.debug("completed() - frame: {}",frame);
connection.getBufferPool().release(buffer);

View File

@ -20,27 +20,28 @@ package org.eclipse.jetty.websocket.common.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public class DataFrameBytes<C> extends FrameBytes<C>
public class DataFrameBytes extends FrameBytes
{
private static final Logger LOG = Log.getLogger(DataFrameBytes.class);
private ByteBuffer buffer;
public DataFrameBytes(AbstractWebSocketConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
public DataFrameBytes(AbstractWebSocketConnection connection, WebSocketFrame frame)
{
super(connection,callback,context,frame);
super(connection,frame);
}
@Override
public void completed(C context)
public void completed(SendResult result)
{
if (LOG.isDebugEnabled())
{
LOG.debug("completed({}) - frame.remaining() = {}",context,frame.remaining());
LOG.debug("completed({}) - frame.remaining() = {}",result,frame.remaining());
}
connection.getBufferPool().release(buffer);
@ -57,7 +58,7 @@ public class DataFrameBytes<C> extends FrameBytes<C>
else
{
LOG.debug("Send complete");
super.completed(context);
super.completed(result);
}
connection.flush();
}
@ -73,7 +74,7 @@ public class DataFrameBytes<C> extends FrameBytes<C>
}
catch (Throwable x)
{
failed(context,x);
failed(null,x);
return null;
}
}

View File

@ -21,28 +21,26 @@ package org.eclipse.jetty.websocket.common.io;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public abstract class FrameBytes<C> implements Callback<C>, Runnable
public abstract class FrameBytes extends FutureCallback<SendResult> implements Runnable
{
private final static Logger LOG = Log.getLogger(FrameBytes.class);
protected final AbstractWebSocketConnection connection;
protected final Callback<C> callback;
protected final C context;
protected final WebSocketFrame frame;
// Task used to timeout the bytes
protected volatile Scheduler.Task task;
protected FrameBytes(AbstractWebSocketConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
protected FrameBytes(AbstractWebSocketConnection connection, WebSocketFrame frame)
{
this.connection = connection;
this.callback = callback;
this.context = context;
this.frame = frame;
}
@ -56,31 +54,33 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
}
@Override
public void completed(C context)
public void completed(SendResult v)
{
super.completed(v);
if (LOG.isDebugEnabled())
{
LOG.debug("completed({}) - {}",context,this.getClass().getName());
LOG.debug("completed({}) - {}",v,this.getClass().getName());
}
cancelTask();
connection.complete(this);
callback.completed(context);
frame.notifySendHandler();
}
@Override
public void failed(C context, Throwable x)
public void failed(SendResult v, Throwable x)
{
super.failed(v,x);
if (x instanceof EofException)
{
// Abbreviate the EofException
LOG.warn("failed(" + context + ") - " + EofException.class);
LOG.warn("failed(" + v + ") - " + EofException.class);
}
else
{
LOG.warn("failed(" + context + ")",x);
LOG.warn("failed(" + v + ")",x);
}
cancelTask();
callback.failed(context,x);
frame.notifySendHandler(x);
}
public abstract ByteBuffer getByteBuffer();
@ -90,7 +90,7 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
{
// If this occurs we had a timeout!
connection.close();
failed(context, new InterruptedByTimeoutException());
failed(null,new InterruptedByTimeoutException());
}
@Override

View File

@ -0,0 +1,73 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.IOException;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public class FramePipes
{
private static class In2Out implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(FramePipes.In2Out.class);
private OutgoingFrames outgoing;
public In2Out(OutgoingFrames outgoing)
{
this.outgoing = outgoing;
}
@Override
public void incomingError(WebSocketException e)
{
/* cannot send exception on */
}
@Override
public void incomingFrame(WebSocketFrame frame)
{
try
{
this.outgoing.outgoingFrame(frame);
}
catch (IOException e)
{
LOG.debug(e);
}
}
}
private static class Out2In implements OutgoingFrames
{
private IncomingFrames incoming;
public Out2In(IncomingFrames incoming)
{
this.incoming = incoming;
}
@Override
public Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException
{
this.incoming.incomingFrame(frame);
return null; // FIXME: should return completed future.
}
}
public static OutgoingFrames to(final IncomingFrames incoming)
{
return new Out2In(incoming);
}
public static IncomingFrames to(final OutgoingFrames outgoing)
{
return new In2Out(outgoing);
}
}

View File

@ -21,14 +21,14 @@ package org.eclipse.jetty.websocket.common.io;
import java.util.LinkedList;
@SuppressWarnings("serial")
public class FrameQueue extends LinkedList<FrameBytes<?>>
public class FrameQueue extends LinkedList<FrameBytes>
{
public void append(FrameBytes<?> bytes)
public void append(FrameBytes bytes)
{
addLast(bytes);
}
public void prepend(FrameBytes<?> bytes)
public void prepend(FrameBytes bytes)
{
addFirst(bytes);
}

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
*/
public interface IncomingFrames
{
public void incoming(WebSocketException e);
public void incomingError(WebSocketException e);
public void incoming(WebSocketFrame frame);
public void incomingFrame(WebSocketFrame frame);
}

View File

@ -21,12 +21,14 @@ package org.eclipse.jetty.websocket.common.io;
import java.io.IOException;
import java.util.concurrent.Future;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
/**
* Interface for dealing with frames outgoing to the network (eventually)
*/
public interface OutgoingFrames
{
<C> Future<C> output(C context, Frame frame) throws IOException;
Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException;
}

View File

@ -24,8 +24,9 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.SuspendToken;
@ -37,7 +38,7 @@ import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.io.event.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriver;
public class WebSocketSession implements WebSocketConnection, LogicalConnection, IncomingFrames, OutgoingFrames
{
@ -130,25 +131,25 @@ public class WebSocketSession implements WebSocketConnection, LogicalConnection,
}
@Override
public void incoming(WebSocketException e)
public void incomingError(WebSocketException e)
{
if (baseConnection.isInputClosed())
{
return; // input is closed
}
// pass on incoming to websocket itself
websocket.incoming(e);
websocket.incomingError(e);
}
@Override
public void incoming(WebSocketFrame frame)
public void incomingFrame(WebSocketFrame frame)
{
if (baseConnection.isInputClosed())
{
return; // input is closed
}
// pass on incoming to websocket itself
websocket.incoming(frame);
websocket.incomingFrame(frame);
}
@Override
@ -188,27 +189,28 @@ public class WebSocketSession implements WebSocketConnection, LogicalConnection,
websocket.onConnect();
}
public <C> Future<C> output(C context, WebSocketFrame frame) throws IOException
@Override
public Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("output({},{}) - {}",context,frame,outgoing);
LOG.debug("output({}) - {}",frame,outgoing);
}
// forward on to chain
return outgoing.output(context,frame);
return outgoing.outgoingFrame(frame);
}
/**
* {@inheritDoc}
*/
@Override
public <C> Future<C> ping(C context, byte[] payload) throws IOException
public Future<SendResult> ping(byte[] payload) throws IOException
{
// Delegate the application called ping to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload);
frame.setFin(true);
output(context,callback,frame);
return outgoingFrame(frame);
}
public void setOutgoing(OutgoingFrames outgoing)
@ -239,86 +241,55 @@ public class WebSocketSession implements WebSocketConnection, LogicalConnection,
}
@Override
public <C> Future<C> write(C context, byte[] buf, int offset, int len) throws IOException
public Future<SendResult> write(byte[] buf, int offset, int len) throws IOException
{
// TODO Auto-generated method stub
return null;
}
@Override
public <C> Future<C> write(C context, ByteBuffer buffer) throws IOException
{
// TODO Auto-generated method stub
return null;
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
{
if (baseConnection.isOutputClosed())
{
return; // output is closed
}
assertBaseConnectionOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
LOG.debug("write(byte[],{},{})",offset,len);
}
// Delegate the application called write to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len);
frame.setFin(true);
output(context,callback,frame);
return outgoingFrame(frame);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
public Future<SendResult> write(ByteBuffer buffer) throws IOException
{
if (baseConnection.isOutputClosed())
{
return; // output is closed
}
assertBaseConnectionOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer));
LOG.debug("write({})",BufferUtil.toDetailString(buffer));
}
// Delegate the application called write to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
frame.setFin(true);
output(context,callback,frame);
return outgoingFrame(frame);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, String message) throws IOException
public Future<SendResult> write(String message) throws IOException
{
if (baseConnection.isOutputClosed())
{
return; // output is closed
}
assertBaseConnectionOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},message.length:{})",callback,message.length());
LOG.debug("write(message.length:{})",message.length());
}
// Delegate the application called ping to the OutgoingFrames interface to allow
// extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.text(message);
frame.setFin(true);
output(context,callback,frame);
return outgoingFrame(frame);
}
@Override
public <C> Future<C> write(C context, String message) throws IOException
private void assertBaseConnectionOpen() throws IOException
{
// TODO Auto-generated method stub
return null;
if (baseConnection.isOutputClosed())
{
throw new IOException("Connection is closed");
}
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.message;
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.nio.ByteBuffer;

View File

@ -16,14 +16,14 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.message;
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.common.io.event.AnnotatedEventDriver;
import org.eclipse.jetty.websocket.common.events.AnnotatedEventDriver;
/**
* Support class for reading binary message data as an InputStream.

View File

@ -16,14 +16,14 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.message;
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.common.io.event.AnnotatedEventDriver;
import org.eclipse.jetty.websocket.common.events.AnnotatedEventDriver;
/**
* Support class for reading text message data as an Reader.

View File

@ -16,14 +16,14 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.message;
package org.eclipse.jetty.websocket.common.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.common.io.event.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriver;
public class SimpleBinaryMessage implements MessageAppender
{

View File

@ -16,13 +16,13 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.message;
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.common.io.event.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriver;
public class SimpleTextMessage implements MessageAppender
{

View File

@ -16,11 +16,11 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
public class AdapterConnectCloseSocket extends WebSocketAdapter
{

View File

@ -16,14 +16,14 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
@WebSocket
public class AnnotatedBinaryArraySocket

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import java.io.InputStream;
@ -25,7 +25,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
@WebSocket
public class AnnotatedBinaryStreamSocket

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
@ -24,7 +24,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
@WebSocket
public class AnnotatedFramesSocket

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import java.io.InputStream;
import java.io.Reader;
@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
@WebSocket
public class AnnotatedStreamingSocket

View File

@ -16,14 +16,14 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
@WebSocket
public class AnnotatedTextSocket

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import java.io.Reader;
@ -25,7 +25,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
@WebSocket
public class AnnotatedTextStreamSocket

View File

@ -16,12 +16,12 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples;
package examples;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.core.io.event.EventCapture;
import org.eclipse.jetty.websocket.common.events.EventCapture;
public class ListenerBasicSocket implements WebSocketListener
{

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples.echo;
package examples.echo;
import java.io.IOException;

View File

@ -16,11 +16,10 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples.echo;
package examples.echo;
import java.io.IOException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@ -40,7 +39,7 @@ public class AnnotatedEchoSocket
}
try
{
conn.write(null,new FutureCallback<Void>(),message);
conn.write(message);
}
catch (IOException e)
{

View File

@ -16,14 +16,15 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.examples.echo;
package examples.echo;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketListener;
@ -70,9 +71,8 @@ public class ListenerEchoSocket implements WebSocketListener
try
{
String context = null;
Callback<String> callback = new FutureCallback<>();
outbound.write(context,callback,message);
@SuppressWarnings("unused")
Future<SendResult> future = outbound.write(message);
}
catch (IOException e)
{

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.TypeUtil;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.protocol.IncomingFramesCapture;
import org.junit.Assert;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.junit.Assert;
import org.junit.Test;

View File

@ -16,7 +16,9 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
@ -26,18 +28,22 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
import org.eclipse.jetty.websocket.common.io.IncomingFrames;
import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class IncomingFramesCapture implements IncomingFrames
public class IncomingFramesCapture extends AbstractJettyFrameHandler implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private LinkedList<WebSocketFrame> frames = new LinkedList<>();
private LinkedList<WebSocketException> errors = new LinkedList<>();
public IncomingFramesCapture()
{
super(null); // End of Chain (don't pass frames to anyone else)
}
public void assertErrorCount(int expectedCount)
{
Assert.assertThat("Captured error count",errors.size(),is(expectedCount));
@ -121,14 +127,20 @@ public class IncomingFramesCapture implements IncomingFrames
}
@Override
public void incoming(WebSocketException e)
public void handleJettyFrame(WebSocketFrame frame)
{
incomingFrame(frame);
}
@Override
public void incomingError(WebSocketException e)
{
LOG.debug(e);
errors.add(e);
}
@Override
public void incoming(WebSocketFrame frame)
public void incomingFrame(WebSocketFrame frame)
{
WebSocketFrame copy = new WebSocketFrame(frame);
frames.add(copy);

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;

View File

@ -16,34 +16,32 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
import org.eclipse.jetty.websocket.common.io.OutgoingFrames;
import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class OutgoingFramesCapture implements OutgoingFrames
public class OutgoingFramesCapture extends AbstractJettyFrameHandler implements OutgoingFrames
{
public static class Write<C>
{
public C context;
public Callback<C> callback;
public WebSocketFrame frame;
}
private LinkedList<WebSocketFrame> frames = new LinkedList<>();
private LinkedList<Write<?>> writes = new LinkedList<>();
public OutgoingFramesCapture()
{
super(null); // Do not forward frames anywhere else, capture is the end of the line
}
public void assertFrameCount(int expectedCount)
{
Assert.assertThat("Captured frame count",writes.size(),is(expectedCount));
Assert.assertThat("Captured frame count",frames.size(),is(expectedCount));
}
public void assertHasFrame(byte op)
@ -58,26 +56,25 @@ public class OutgoingFramesCapture implements OutgoingFrames
public void assertHasNoFrames()
{
Assert.assertThat("Has no frames",writes.size(),is(0));
Assert.assertThat("Has no frames",frames.size(),is(0));
}
public void dump()
{
System.out.printf("Captured %d outgoing writes%n",writes.size());
for (int i = 0; i < writes.size(); i++)
System.out.printf("Captured %d outgoing writes%n",frames.size());
for (int i = 0; i < frames.size(); i++)
{
Write<?> write = writes.get(i);
System.out.printf("[%3d] %s | %s | %s%n",i,write.context,write.callback,write.frame);
System.out.printf(" %s%n",BufferUtil.toDetailString(write.frame.getPayload()));
WebSocketFrame frame = frames.get(i);
System.out.printf("[%3d] %s%n",i,frame);
System.out.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
public int getFrameCount(byte op)
{
int count = 0;
for (Write<?> write : writes)
for (WebSocketFrame frame : frames)
{
WebSocketFrame frame = write.frame;
if (frame.getOpCode() == op)
{
count++;
@ -86,18 +83,22 @@ public class OutgoingFramesCapture implements OutgoingFrames
return count;
}
public LinkedList<Write<?>> getWrites()
public LinkedList<WebSocketFrame> getFrames()
{
return writes;
return frames;
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
public void handleJettyFrame(WebSocketFrame frame)
{
Write<C> write = new Write<C>();
write.context = context;
write.callback = callback;
write.frame = frame;
writes.add(write);
outgoingFrame(frame);
}
@Override
public Future<SendResult> outgoingFrame(WebSocketFrame frame)
{
frames.add(frame);
return null; // FIXME: should return completed future.
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
@ -24,25 +24,27 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import javax.net.websocket.SendResult;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractJettyFrameHandler;
import org.eclipse.jetty.websocket.common.io.OutgoingFrames;
import org.junit.Assert;
/**
* Capture outgoing network bytes.
*/
public class OutgoingNetworkBytesCapture implements OutgoingFrames
public class OutgoingNetworkBytesCapture extends AbstractJettyFrameHandler implements OutgoingFrames
{
private final Generator generator;
private List<ByteBuffer> captured;
public OutgoingNetworkBytesCapture(Generator generator)
{
super(null); // no sub handling, capture is end of the line.
this.generator = generator;
this.captured = new ArrayList<>();
}
@ -61,10 +63,24 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
public void handleJettyFrame(WebSocketFrame frame)
{
try
{
outgoingFrame(frame);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
@Override
public Future<SendResult> outgoingFrame(WebSocketFrame frame) throws IOException
{
ByteBuffer buf = generator.generate(frame);
captured.add(buf.slice());
callback.completed(context);
return null; // FIXME: should return completed future.
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -24,7 +24,6 @@ import java.util.Arrays;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.junit.Test;
public class RFC6455ExamplesGeneratorTest

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import java.util.Arrays;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import java.util.List;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.protocol;
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
@ -29,7 +29,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.junit.BeforeClass;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

View File

@ -16,19 +16,19 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.ByteBufferAssert;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.eclipse.jetty.websocket.core.protocol.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.protocol.UnitGenerator;
import org.junit.Assert;
import org.junit.Test;

View File

@ -16,20 +16,20 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.ByteBufferAssert;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.eclipse.jetty.websocket.core.protocol.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.protocol.UnitGenerator;
import org.junit.Assert;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -25,13 +25,13 @@ import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.ByteBufferAssert;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.eclipse.jetty.websocket.core.protocol.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.protocol.UnitGenerator;
import org.junit.Assert;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import java.util.ArrayList;
import java.util.Collection;
@ -26,8 +26,8 @@ import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.protocol.UnitGenerator;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import java.nio.ByteBuffer;
@ -24,8 +24,8 @@ import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.core.protocol.IncomingFramesCapture;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.ab;
package org.eclipse.jetty.websocket.common.ab;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -26,14 +26,14 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.ByteBufferAssert;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.core.ByteBufferAssert;
import org.eclipse.jetty.websocket.core.protocol.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.protocol.UnitGenerator;
import org.junit.Assert;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import java.io.InputStream;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import java.io.IOException;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import java.io.IOException;

View File

@ -16,11 +16,10 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.annotations;
package org.eclipse.jetty.websocket.common.annotations;
import java.io.IOException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@ -41,7 +40,7 @@ public class MyStatelessEchoSocket
{
try
{
conn.write(null,new FutureCallback<Void>(),text);
conn.write(text);
}
catch (IOException e)
{

Some files were not shown because too many files have changed in this diff Show More