Refactoring out the byte[] array in favor of ByteBuffer in frame (to reduce number of data copies)

This commit is contained in:
Joakim Erdfelt 2012-07-09 15:26:05 -07:00
parent 3ae9548c43
commit 5f6d7ec596
6 changed files with 63 additions and 101 deletions

View File

@ -87,5 +87,5 @@ public interface WebSocketConnection
* NIO style with callbacks, allows for concurrent results of the entire write operation. (Callback is only called once at the end of processing all of the
* messages)
*/
<C> void write(C context, Callback<C> callback, String... messages) throws IOException;
<C> void write(C context, Callback<C> callback, String message) throws IOException;
}

View File

@ -37,7 +37,7 @@ public class WebSocketBlockingConnection
}
this.bufferPool = this.conn.getBufferPool();
this.policy = conn.getPolicy();
this.generator = new Generator(this.policy);
this.generator = new Generator(this.policy,bufferPool);
}
/**

View File

@ -1,63 +0,0 @@
package org.eclipse.jetty.websocket.api.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.io.RawConnection;
import org.eclipse.jetty.websocket.protocol.FrameBuilder;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@Deprecated
public class WebSocketPing
{
private Raw Connection conn;
private ByteBufferPool bufferPool;
private WebSocketPolicy policy;
private Generator generator;
public WebSocketPing(WebSocketConnection conn)
{
if (conn instanceof RawConnection)
{
this.conn = (RawConnection)conn;
}
else
{
throw new IllegalArgumentException("WebSocketConnection must implement internal RawConnection interface");
}
this.bufferPool = this.conn.getBufferPool();
this.policy = conn.getPolicy();
this.generator = new Generator(this.policy);
}
public void sendPing(byte data[]) throws IOException
{
WebSocketFrame frame = FrameBuilder.ping().payload(data).asFrame();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
try
{
generator.generate(buf,frame);
FutureCallback<Void> blocking = new FutureCallback<>();
this.conn.writeRaw(null,blocking,buf);
blocking.get(); // block till finished sending?
}
catch (InterruptedException e)
{
throw new IOException("Blocking write failed",e);
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
finally
{
bufferPool.release(buf);
}
}
}

View File

@ -75,7 +75,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
{
bufferPool.release(buffer);
if (frame.available() > 0)
if (frame.remaining() > 0)
{
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
@ -118,7 +118,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
private final Callback<C> callback;
protected final C context;
protected final WebSocketFrame frame;
protected final ByteBuffer buffer;
protected ByteBuffer buffer;
// Task used to timeout the bytes
protected volatile ScheduledFuture<?> task;
@ -206,7 +206,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
super(endp,executor);
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy);
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy);
this.scheduler = scheduler;
this.extensions = new ArrayList<>();

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.websocket.protocol;
import java.nio.ByteBuffer;
/**
* The immutable frame details.
* <p>
@ -11,10 +13,7 @@ public interface Frame
public OpCode getOpCode();
/**
* @return a copy of the payload data
*/
public byte[] getPayloadData();
public ByteBuffer getPayload();
public int getPayloadLength();

View File

@ -1,9 +1,8 @@
package org.eclipse.jetty.websocket.protocol;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.ProtocolException;
/**
@ -42,7 +41,7 @@ public class WebSocketFrame implements Frame
private OpCode opcode = null;
private boolean masked = false;
private byte mask[];
private byte payload[];
private ByteBuffer data;
private boolean continuation = false;
private int continuationIndex = 0;
@ -100,13 +99,6 @@ public class WebSocketFrame implements Frame
}
}
@Override
public WebSocketFrame clone()
{
// TODO: impl
return null;
}
/**
* The number of fragments this frame consists of.
* <p>
@ -137,39 +129,34 @@ public class WebSocketFrame implements Frame
return opcode;
}
@Override
public ByteBuffer getPayload()
{
return payload.slice();
return data.slice();
}
public String getPayloadAsUTF8()
{
if (payload == null)
if (data == null)
{
return null;
}
return StringUtil.toUTF8String(payload,0,payload.length);
}
@Override
public byte[] getPayloadData()
{
return payload;
return BufferUtil.toUTF8String(data);
}
@Override
public int getPayloadLength()
{
if (payload == null)
if (data == null)
{
return 0;
}
return payload.length;
return data.remaining();
}
public boolean hasPayload()
{
return payload != null;
return ((data != null) && (data.remaining() > 0));
}
public boolean isContinuation()
@ -212,6 +199,13 @@ public class WebSocketFrame implements Frame
return rsv3;
}
public int remaining() {
if(data == null) {
return 0;
}
return data.remaining();
}
public void reset()
{
fin = false;
@ -220,7 +214,7 @@ public class WebSocketFrame implements Frame
rsv3 = false;
opcode = null;
masked = false;
payload = null;
data = null;
mask = null;
continuationIndex = 0;
continuation = false;
@ -267,7 +261,7 @@ public class WebSocketFrame implements Frame
{
if (buf == null)
{
payload = null;
data = null;
return;
}
@ -278,9 +272,11 @@ public class WebSocketFrame implements Frame
throw new ProtocolException("Control Payloads can not exceed 125 bytes in length.");
}
}
int len = buf.length;
payload = new byte[len];
System.arraycopy(buf,0,payload,0,len);
data = ByteBuffer.allocate(len);
BufferUtil.clearToFill(data);
data.put(buf,0,len);
}
/**
@ -293,7 +289,7 @@ public class WebSocketFrame implements Frame
{
if (buf == null)
{
payload = null;
data = null;
return;
}
@ -305,8 +301,38 @@ public class WebSocketFrame implements Frame
}
}
payload = new byte[len];
System.arraycopy(buf,offset,payload,0,len);
data = ByteBuffer.allocate(len);
BufferUtil.clearToFill(data);
data.put(buf,0,len);
}
/**
* Set the data payload.
* <p>
* The provided buffer will be used as is, no copying of bytes performed.
* <p>
* The provided buffer should be flipped and ready to READ from.
*
* @param buf
* the bytebuffer to set
*/
public void setPayload(ByteBuffer buf)
{
if (buf == null)
{
data = null;
return;
}
if (opcode.isControlFrame())
{
if (buf.remaining() > WebSocketFrame.MAX_CONTROL_PAYLOAD)
{
throw new ProtocolException("Control Payloads can not exceed 125 bytes in length.");
}
}
data = buf.slice();
}
public void setRsv1(boolean rsv1)