Another review refact (in progress)

This commit is contained in:
Joakim Erdfelt 2012-07-09 14:42:59 -07:00
parent 6a0e3f6c72
commit 3ae9548c43
7 changed files with 219 additions and 27 deletions

View File

@ -65,6 +65,15 @@ public interface WebSocketConnection
*/
boolean isOpen();
/**
* Send a single ping messages.
* <p>
* NIO style with callbacks, allows for knowledge of successful ping send.
* <p>
* Use @OnWebSocketFrame and monitor Pong frames
*/
<C> void ping(C context, Callback<C> callback, byte payload[]) throws IOException;
/**
* Send a a binary message.
* <p>

View File

@ -13,9 +13,10 @@ import org.eclipse.jetty.websocket.io.RawConnection;
import org.eclipse.jetty.websocket.protocol.FrameBuilder;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@Deprecated
public class WebSocketPing
{
private RawConnection conn;
private Raw Connection conn;
private ByteBufferPool bufferPool;
private WebSocketPolicy policy;
private Generator generator;

View File

@ -2,6 +2,7 @@ package org.eclipse.jetty.websocket.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -38,9 +39,9 @@ public class Generator
private final FrameGenerator basicGenerator;
public Generator(WebSocketPolicy policy)
public Generator(WebSocketPolicy policy, ByteBufferPool bufferPool)
{
basicGenerator = new FrameGenerator(policy);
basicGenerator = new FrameGenerator(policy,bufferPool);
}
public ByteBuffer generate(ByteBuffer buffer, WebSocketFrame frame)
@ -57,6 +58,21 @@ public class Generator
return ret;
}
public ByteBuffer generate(int size, WebSocketFrame frame)
{
// NEW ONE - allocate and fill only to param size
// note: size is the buffer size to generate into, not related to the payload size
// note: size cannot be less than framing overhead
// TODO: update frame.setAvailable() to indicate how much of payload is left to be generated
return basicGenerator.generate(frame);
}
public ByteBuffer generate(WebSocketFrame frame)
{
// NEW ONE - allocate as much as needed
return basicGenerator.generate(frame);
}
@Override
public String toString()
{

View File

@ -3,15 +3,17 @@ package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -25,6 +27,7 @@ import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.FrameBuilder;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
@ -32,6 +35,148 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
*/
public class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, WebSocketConnection
{
public class ControlFrameBytes<C> extends FrameBytes<C>
{
public ControlFrameBytes(C context, Callback<C> callback, WebSocketFrame frame, ByteBuffer buffer)
{
super(context,callback,frame,buffer);
}
@Override
public void completed(C context) {
bufferPool.release(buffer);
super.completed(context);
if(frame.getOpCode() == OpCode.CLOSE)
{
// TODO: close the connection (no packet)
}
}
@Override
public ByteBuffer getByteBuffer()
{
return buffer;
}
}
public class DataFrameBytes<C> extends FrameBytes<C>
{
private int size;
public DataFrameBytes(C context, Callback<C> callback, WebSocketFrame frame, ByteBuffer buffer)
{
super(context,callback,frame,buffer);
}
@Override
public void completed(C context)
{
bufferPool.release(buffer);
if (frame.available() > 0)
{
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
// DataInfo for the same stream is written before this one is finished.
prepend(this);
}
else
{
super.completed(context);
}
}
@Override
public ByteBuffer getByteBuffer()
{
try
{
int windowSize = policy.getBufferSize();
// TODO: create a window size?
size = frame.getPayloadLength();
if (size > windowSize)
{
size = windowSize;
}
buffer = generator.generate(size,frame);
return buffer;
}
catch (Throwable x)
{
failed(context,x);
return null;
}
}
}
public abstract class FrameBytes<C> implements Callback<C>, Runnable
{
private final Callback<C> callback;
protected final C context;
protected final WebSocketFrame frame;
protected final ByteBuffer buffer;
// Task used to timeout the bytes
protected volatile ScheduledFuture<?> task;
protected FrameBytes(C context, Callback<C> callback, WebSocketFrame frame, ByteBuffer buffer)
{
this.callback = callback;
this.context = context;
this.frame = frame;
this.buffer = buffer;
}
private void cancelTask()
{
ScheduledFuture<?> task = this.task;
if (task != null)
{
task.cancel(false);
}
}
@Override
public void completed(C context)
{
cancelTask();
callback.completed(context);
}
@Override
public void failed(C context, Throwable x)
{
cancelTask();
callback.failed(context,x);
}
public abstract ByteBuffer getByteBuffer();
@Override
public void run()
{
// If this occurs we had a timeout!
try
{
close();
}
catch (IOException e)
{
LOG.ignore(e);
}
failed(context, new InterruptedByTimeoutException());
}
@Override
public String toString()
{
return frame.toString();
}
}
private static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class);
private static final ThreadLocal<WebSocketAsyncConnection> CURRENT_CONNECTION = new ThreadLocal<WebSocketAsyncConnection>();
@ -46,21 +191,24 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
}
private final ByteBufferPool bufferPool;
private Generator generator;
private Parser parser;
private WebSocketPolicy policy;
private final ScheduledExecutorService scheduler;
private final Generator generator;
private final Parser parser;
private final WebSocketPolicy policy;
// TODO: track extensions? (only those that need to operate at this level?)
// TODO: are extensions going to layer the endpoint?
// TODO: are extensions going to layer the connection?
private List<ExtensionConfig> extensions;
public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, WebSocketPolicy policy, ByteBufferPool bufferPool)
public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor);
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy);
this.parser = new Parser(policy);
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
}
@ -85,7 +233,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
catch (IOException e)
{
terminateConnection(StatusCode.PROTOCOL,e.getMessage());
throw new RuntimeIOException(e);
return 0;
}
}
@ -156,13 +304,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
LOG.debug("onFillable");
setCurrentConnection(this);
ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clear(buffer);
BufferUtil.clearToFill(buffer);
try
{
read(buffer);
}
finally
{
// TODO: does fillInterested need to be called again?
fillInterested();
setCurrentConnection(null);
bufferPool.release(buffer);
}
@ -176,6 +326,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
fillInterested();
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
WebSocketFrame frame = FrameBuilder.ping().payload(payload).asFrame();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
generator.generate(buf,frame);
writeRaw(context,callback,buf);
}
private void read(ByteBuffer buffer)
{
while (true)
@ -264,28 +423,22 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, String... messages) throws IOException
public <C> void write(C context, Callback<C> callback, String message) throws IOException
{
int len = messages.length;
if (len == 0)
{
// nothing to write
return;
}
if (LOG.isDebugEnabled())
{
LOG.debug("write(context,{},Strings->{})",callback,messages.length);
LOG.debug("write(context,{},message.length:{})",callback,message.length());
}
ByteBuffer raw[] = new ByteBuffer[messages.length];
for (int i = 0; i < len; i++)
{
WebSocketFrame frame = FrameBuilder.text(messages[i]).fin(true).asFrame();
raw[i] = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(raw[i]);
generator.generate(raw[i],frame);
BufferUtil.flipToFlush(raw[i],0);
}
WebSocketFrame frame = FrameBuilder.text(message).fin(true).asFrame();
FrameBytes bytes = new FrameBytes<C>(callback,context,frame,buffer);
WebSocketFrame frame = FrameBuilder.text(messages[i]).fin(true).asFrame();
raw[i] = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(raw[i]);
generator.generate(raw[i],frame);
BufferUtil.flipToFlush(raw[i],0);
writeRaw(context,callback,raw);
}

View File

@ -113,6 +113,7 @@ public class FrameParser
if (frame.isMasked())
{
// Demask the content 1 byte at a time
// FIXME: on partially parsed frames this needs an offset from prior parse
byte mask[] = getFrame().getMask();
for (int i = 0; i < amt; i++)
{

View File

@ -2,6 +2,8 @@ package org.eclipse.jetty.websocket.protocol;
/**
* The immutable frame details.
* <p>
* Used by end user via @OnWebSocketFrame
*/
public interface Frame
{
@ -9,6 +11,9 @@ public interface Frame
public OpCode getOpCode();
/**
* @return a copy of the payload data
*/
public byte[] getPayloadData();
public int getPayloadLength();

View File

@ -1,6 +1,8 @@
package org.eclipse.jetty.websocket.protocol;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.ProtocolException;
@ -135,6 +137,11 @@ public class WebSocketFrame implements Frame
return opcode;
}
public ByteBuffer getPayload()
{
return payload.slice();
}
public String getPayloadAsUTF8()
{
if (payload == null)