Splitting out new WebSocketSession from WebSocketAsyncConnection to better support extensions (see diagrams)

This commit is contained in:
Joakim Erdfelt 2012-07-16 15:49:12 -07:00
parent 10eeb3f6cf
commit 6c0e24485a
13 changed files with 177 additions and 205 deletions

View File

@ -16,6 +16,8 @@
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
@ -106,16 +108,21 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames
* @param frame
* the frame to send to the next output
*/
public void nextOutput(WebSocketFrame frame)
public <C> void nextOutput(C context, Callback<C> callback, WebSocketFrame frame)
{
nextOutgoingFrames.output(frame);
nextOutgoingFrames.output(context,callback,frame);
}
public <C> void nextOutputNoCallback(WebSocketFrame frame)
{
nextOutgoingFrames.output(null,new FutureCallback<Void>(),frame);
}
@Override
public void output(WebSocketFrame frame)
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
// pass thru, un-modified
nextOutgoingFrames.output(frame);
nextOutgoingFrames.output(context,callback,frame);
}
public void setBufferPool(ByteBufferPool bufferPool)

View File

@ -20,7 +20,6 @@ import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.io.DataFrameBytes;
import org.eclipse.jetty.websocket.io.RawConnection;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -56,8 +55,7 @@ public class WebSocketBlockingConnection
try
{
FutureCallback<Void> blocking = new FutureCallback<>();
DataFrameBytes<Void> bytes = new DataFrameBytes<>(conn,blocking,null,frame);
this.conn.getQueue().append(bytes);
conn.output(null,blocking,frame);
blocking.get(); // block till finished
}
catch (InterruptedException e)
@ -81,8 +79,7 @@ public class WebSocketBlockingConnection
try
{
FutureCallback<Void> blocking = new FutureCallback<>();
DataFrameBytes<Void> bytes = new DataFrameBytes<>(conn,blocking,null,frame);
this.conn.getQueue().append(bytes);
conn.output(null,blocking,frame);
blocking.get(); // block till finished
}
catch (InterruptedException e)

View File

@ -167,7 +167,7 @@ public class WebSocketEventDriver implements IncomingFrames
{
WebSocketFrame pong = new WebSocketFrame(OpCode.PONG);
pong.setPayload(frame.getPayload());
connection.write(null,new FutureCallback<Void>(),pong);
connection.output(null,new FutureCallback<Void>(),pong);
break;
}
case BINARY:

View File

@ -21,6 +21,7 @@ import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BadPayloadException;
@ -65,8 +66,8 @@ public class DeflateFrameExtension extends Extension
case PONG:
if (len > WebSocketFrame.MAX_CONTROL_PAYLOAD)
{
throw new ProtocolException("Invalid control frame payload length, [" + len + "] cannot exceed ["
+ WebSocketFrame.MAX_CONTROL_PAYLOAD + "]");
throw new ProtocolException("Invalid control frame payload length, [" + len + "] cannot exceed [" + WebSocketFrame.MAX_CONTROL_PAYLOAD
+ "]");
}
break;
}
@ -122,19 +123,19 @@ public class DeflateFrameExtension extends Extension
}
@Override
public void output(WebSocketFrame frame)
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame())
{
// skip, cannot compress control frames.
super.output(frame);
nextOutput(context,callback,frame);
return;
}
if (frame.getPayloadLength() < minLength)
{
// skip, frame too small to care compressing it.
super.output(frame);
nextOutput(context,callback,frame);
return;
}
@ -178,7 +179,7 @@ public class DeflateFrameExtension extends Extension
frame.setPayload(out);
frame.setRsv1(deflater.finished());
nextOutput(frame);
nextOutput(context,callback,frame);
// free original data buffer
getBufferPool().release(data);

View File

@ -17,6 +17,7 @@ package org.eclipse.jetty.websocket.extensions.fragment;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.OpCode;
@ -27,13 +28,14 @@ public class FragmentExtension extends Extension
private int maxLength = -1;
private int minFragments = 1;
@Override
public void output(WebSocketFrame frame)
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame())
{
// Cannot fragment Control Frames
getNextOutgoingFrames().output(frame);
nextOutput(context,callback,frame);
return;
}
@ -57,7 +59,7 @@ public class FragmentExtension extends Extension
payload.limit(Math.min(payload.limit() + maxLength,originalLimit));
frag.setPayload(payload);
nextOutput(frag);
nextOutputNoCallback(frag);
length -= maxLength;
opcode = OpCode.CONTINUATION;
@ -79,7 +81,7 @@ public class FragmentExtension extends Extension
frag.setFin(false);
frag.setPayload(payload);
nextOutput(frag);
nextOutputNoCallback(frag);
length -= fragLength;
opcode = OpCode.CONTINUATION;
}
@ -91,7 +93,7 @@ public class FragmentExtension extends Extension
payload.limit(originalLimit);
frag.setPayload(payload);
nextOutput(frag);
nextOutput(context,callback,frag);
}
@Override

View File

@ -26,7 +26,7 @@ public class ControlFrameBytes<C> extends FrameBytes<C>
{
private ByteBuffer buffer;
public ControlFrameBytes(RawConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
public ControlFrameBytes(WebSocketAsyncConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
{
super(connection,callback,context,frame);
}

View File

@ -26,7 +26,7 @@ public class DataFrameBytes<C> extends FrameBytes<C>
private int size;
private ByteBuffer buffer;
public DataFrameBytes(RawConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
public DataFrameBytes(WebSocketAsyncConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
{
super(connection,callback,context,frame);
}

View File

@ -25,14 +25,14 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public abstract class FrameBytes<C> implements Callback<C>, Runnable
{
protected final RawConnection connection;
protected final WebSocketAsyncConnection connection;
protected final Callback<C> callback;
protected final C context;
protected final WebSocketFrame frame;
// Task used to timeout the bytes
protected volatile ScheduledFuture<?> task;
protected FrameBytes(RawConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
protected FrameBytes(WebSocketAsyncConnection connection, Callback<C> callback, C context, WebSocketFrame frame)
{
this.connection = connection;
this.callback = callback;

View File

@ -1,5 +1,6 @@
package org.eclipse.jetty.websocket.io;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
@ -7,5 +8,5 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
*/
public interface OutgoingFrames
{
void output(WebSocketFrame frame);
<C> void output(C context, Callback<C> callback, WebSocketFrame frame);
}

View File

@ -16,42 +16,22 @@
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import java.net.InetSocketAddress;
/**
* Interface for working with connections in a raw way.
* <p>
* This is abstracted out to allow for common access to connection internals regardless of physical vs virtual connections.
*/
public interface RawConnection extends WebSocketConnection
public interface RawConnection extends OutgoingFrames
{
void close() throws IOException;
<C> void complete(FrameBytes<C> frameBytes);
void close(int statusCode, String reason) throws IOException;
void disconnect(boolean onlyOutput);
void flush();
InetSocketAddress getRemoteAddress();
ByteBufferPool getBufferPool();
Executor getExecutor();
Generator getGenerator();
Parser getParser();
WebSocketPolicy getPolicy();
FrameQueue getQueue();
<C> void write(C context, Callback<C> callback, WebSocketFrame frame);
boolean isOpen();
}

View File

@ -39,14 +39,13 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io
*/
public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, WebSocketConnection, OutgoingFrames
public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames
{
static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class);
private static final ThreadLocal<WebSocketAsyncConnection> CURRENT_CONNECTION = new ThreadLocal<WebSocketAsyncConnection>();
@ -68,7 +67,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
private final WebSocketPolicy policy;
private final FrameQueue queue;
private List<ExtensionConfig> extensions;
private OutgoingFrames outgoingFramesHandler;
private boolean flushing;
public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
@ -95,7 +93,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
terminateConnection(statusCode,reason);
}
@Override
public <C> void complete(FrameBytes<C> frameBytes)
{
synchronized (queue)
@ -134,7 +131,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
}
}
@Override
public void flush()
{
FrameBytes<?> frameBytes = null;
@ -160,13 +156,11 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
write(buffer,this,frameBytes);
}
@Override
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
@Override
public Executor getExecutor()
{
return getExecutor();
@ -184,25 +178,21 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
return extensions;
}
@Override
public Generator getGenerator()
{
return generator;
}
@Override
public Parser getParser()
{
return parser;
}
@Override
public WebSocketPolicy getPolicy()
{
return this.policy;
}
@Override
public FrameQueue getQueue()
{
return queue;
@ -219,13 +209,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
return scheduler;
}
@Override
public String getSubProtocol()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isOpen()
{
@ -268,19 +251,25 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
fillInterested();
}
/**
* Enqueue internal frame from {@link OutgoingFrames} stack for eventual write out on the physical connection.
*/
@Override
public void output(WebSocketFrame frame)
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
// TODO Auto-generated method stub
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload);
ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.prepend(bytes);
if (frame.getOpCode().isControlFrame())
{
ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.prepend(bytes);
}
else
{
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
}
flush();
}
private void read(ByteBuffer buffer)
@ -354,80 +343,4 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
LOG.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
getEndPoint().write(frameBytes.context,frameBytes,buffer);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, byte buf[], int offset, int len) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
}
if (len == 0)
{
// nothing to write
return;
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len);
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
flush();
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer));
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
flush();
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},message.length:{})",callback,message.length());
}
WebSocketFrame frame = WebSocketFrame.text(message);
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
flush();
}
@Override
public <C> void write(C context, Callback<C> callback, WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame())
{
ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.prepend(bytes);
}
else
{
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
}
}
}

View File

@ -0,0 +1,115 @@
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class WebSocketSession implements WebSocketConnection
{
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private RawConnection connection;
private OutgoingFrames outgoing;
private String subprotocol;
private WebSocketPolicy policy;
@Override
public void close() throws IOException
{
connection.close();
}
@Override
public void close(int statusCode, String reason) throws IOException
{
connection.close(statusCode,reason);
}
@Override
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
public InetSocketAddress getRemoteAddress()
{
return connection.getRemoteAddress();
}
@Override
public String getSubProtocol()
{
return subprotocol;
}
@Override
public boolean isOpen()
{
return connection.isOpen();
}
/**
* {@inheritDoc}
*/
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload);
frame.setFin(true);
outgoing.output(context,callback,frame);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len);
frame.setFin(true);
outgoing.output(context,callback,frame);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer));
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
frame.setFin(true);
outgoing.output(context,callback,frame);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},message.length:{})",callback,message.length());
}
WebSocketFrame frame = WebSocketFrame.text(message);
frame.setFin(true);
outgoing.output(context,callback,frame);
}
}

View File

@ -18,14 +18,10 @@ package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.rules.TestName;
@ -58,57 +54,17 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect
{
}
@Override
public <C> void complete(FrameBytes<C> frameBytes)
{
}
@Override
public void disconnect(boolean onlyOutput)
{
}
@Override
public void flush()
{
}
@Override
public ByteBufferPool getBufferPool()
{
return null;
}
@Override
public Executor getExecutor()
{
return null;
}
@Override
public Generator getGenerator()
{
return null;
}
@Override
public Parser getParser()
{
return null;
}
@Override
public WebSocketPolicy getPolicy()
{
return null;
}
@Override
public FrameQueue getQueue()
{
return null;
}
@Override
public InetSocketAddress getRemoteAddress()
{
@ -127,6 +83,11 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect
return false;
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
@ -152,9 +113,4 @@ public class LocalWebSocketConnection implements RawConnection, WebSocketConnect
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
}
@Override
public <C> void write(C context, Callback<C> callback, WebSocketFrame frame)
{
}
}