Fixing buffer flip logic

This commit is contained in:
Joakim Erdfelt 2012-06-29 13:00:08 -07:00
parent ce90b8581b
commit 3fa89ea577
6 changed files with 35 additions and 9 deletions

View File

@ -100,7 +100,8 @@ public class EventMethod
}
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e)
{
LOG.warn("Cannot call method {} on {} with {}",method,pojo,args,e);
LOG.warn("Cannot call method {} on {} with {}",method,pojo,args);
LOG.warn(e);
}
}

View File

@ -3,6 +3,7 @@ package org.eclipse.jetty.websocket.generator;
import java.nio.ByteBuffer;
import java.util.EnumMap;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.OpCode;
@ -52,6 +53,10 @@ public class Generator
{ "unchecked", "rawtypes" })
public ByteBuffer generate(ByteBuffer buffer, BaseFrame frame)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Buffer: {}",BufferUtil.toDetailString(buffer));
}
FrameGenerator generator = generators.get(frame.getOpCode());
LOG.debug(generator.getClass().getSimpleName() + " active");
return generator.generate(buffer,frame);

View File

@ -240,9 +240,15 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
LOG.debug("write(BaseFrame->{})",frame);
}
ByteBuffer raw = ByteBuffer.allocate(frame.getPayloadLength() + FrameGenerator.OVERHEAD);
ByteBuffer raw = bufferPool.acquire(frame.getPayloadLength() + FrameGenerator.OVERHEAD,false);
BufferUtil.clearToFill(raw);
generator.generate(raw,frame);
BufferUtil.flipToFlush(raw,0);
Callback<Void> nop = new FutureCallback<>(); // TODO: add buffer release callback?
if (LOG.isDebugEnabled())
{
LOG.debug("Raw Buffer: {}",BufferUtil.toDetailString(raw));
}
getEndPoint().write(null,nop,raw);
}
@ -273,8 +279,10 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
for (int i = 0; i < len; i++)
{
raw[i] = bufferPool.acquire(buffers[i].remaining() + FrameGenerator.OVERHEAD,false);
BufferUtil.clearToFill(raw[i]);
BinaryFrame frame = new BinaryFrame(buffers[i]);
generator.generate(raw[i],frame);
BufferUtil.flipToFlush(raw[i],0);
}
Callback<Void> nop = new FutureCallback<>(); // TODO: add buffer release callback?
getEndPoint().write(null,nop,raw);
@ -297,7 +305,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
for (int i = 0; i < len; i++)
{
raw[i] = bufferPool.acquire(frames[i].getPayloadLength() + FrameGenerator.OVERHEAD,false);
BufferUtil.clearToFill(raw[i]);
generator.generate(raw[i],frames[i]);
BufferUtil.flipToFlush(raw[i],0);
}
getEndPoint().write(context,callback,raw);
}
@ -319,8 +329,10 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
for (int i = 0; i < len; i++)
{
raw[i] = bufferPool.acquire(buffers[i].remaining() + FrameGenerator.OVERHEAD,false);
BufferUtil.clearToFill(raw[i]);
BinaryFrame frame = new BinaryFrame(buffers[i]);
generator.generate(raw[i],frame);
BufferUtil.flipToFlush(raw[i],0);
}
getEndPoint().write(context,callback,raw);
}

View File

@ -342,11 +342,8 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
endp.setAsyncConnection(connection);
connection.getParser().addListener(websocket);
LOG.debug("EndPoint: {}",endp);
LOG.debug("Connection: {}",connection);
// Notify POJO of connection
websocket.setConnection(connection);
LOG.debug("HttpConnection: {}",http);
LOG.debug("AsyncWebSocketConnection: {}",connection);
// Initialize / Negotiate Extensions
List<Extension> extensions = initExtensions(request.getExtensions());
@ -355,12 +352,14 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
LOG.debug("Handshake Response: {}",handshaker);
handshaker.doHandshakeResponse(request,response,extensions);
connection.fillInterested();
LOG.debug("EndPoint: {}",endp);
LOG.debug("Handshake Complete: {}",connection);
// Add connection
addConnection(connection);
// Notify POJO of connect
// Notify POJO of connection
websocket.setConnection(connection);
websocket.onConnect();
// Tell jetty about the new connection

View File

@ -174,6 +174,7 @@ public class BlockheadClient implements Parser.Listener
@Override
public void onFrame(BaseFrame frame)
{
LOG.debug("onFrame({})",frame);
if (!incomingFrameQueue.offerLast(frame))
{
throw new RuntimeException("Unable to queue incoming frame: " + frame);
@ -204,12 +205,17 @@ public class BlockheadClient implements Parser.Listener
long expireOn = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(buf);
try
{
int len = 0;
while (incomingFrameQueue.size() < (startCount + expectedCount))
{
if (read(buf) > 0)
len = read(buf);
if (len > 0)
{
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
parser.parse(buf);
}
if (System.currentTimeMillis() > expireOn)
@ -318,6 +324,7 @@ public class BlockheadClient implements Parser.Listener
public void write(BaseFrame frame) throws IOException
{
LOG.debug("write(BaseFrame->{})",frame);
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
try
{
@ -334,6 +341,7 @@ public class BlockheadClient implements Parser.Listener
public void write(String str) throws IOException
{
LOG.debug("write(String->{})",str);
out.write(StringUtil.getBytes(str,StringUtil.__ISO_8859_1));
}
}

View File

@ -1,3 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.io.LEVEL=DEBUG
org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
org.eclipse.jetty.websocket.LEVEL=DEBUG