Starting removal of WebSocketPolicy.getBufferSize()

This commit is contained in:
Joakim Erdfelt 2012-09-14 10:41:11 -07:00
parent 44b963e7b8
commit c7404731c9
15 changed files with 66 additions and 76 deletions

View File

@ -127,8 +127,7 @@ public class UpgradeConnection extends AbstractConnection
@Override
public void onFillable()
{
int bufSize = client.getPolicy().getBufferSize();
ByteBuffer buffer = bufferPool.acquire(bufSize,false);
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer);
boolean readMore = false;
try
@ -156,7 +155,7 @@ public class UpgradeConnection extends AbstractConnection
/**
* Read / Parse the waiting read/fill buffer
*
*
* @param buffer
* the buffer to fill into from the endpoint
* @return true if there is more to read, false if reading should stop

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@ -29,17 +31,15 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
public class ServerReadThread extends Thread
{
private static final int BUFFER_SIZE = 8192;
private static final Logger LOG = Log.getLogger(ServerReadThread.class);
private final ServerConnection conn;
private boolean active = true;
@ -72,8 +72,7 @@ public class ServerReadThread extends Thread
public void run()
{
ByteBufferPool bufferPool = conn.getBufferPool();
WebSocketPolicy policy = conn.getPolicy();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
int len = 0;

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.client.blockhead;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -62,9 +64,6 @@ import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* A overly simplistic websocket server used during testing.
* <p>
@ -74,6 +73,7 @@ public class BlockheadServer
{
public static class ServerConnection implements IncomingFrames, OutgoingFrames
{
private final int BUFFER_SIZE = 8192;
private final Socket socket;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
@ -96,7 +96,7 @@ public class BlockheadServer
this.socket = socket;
this.incomingFrames = new IncomingFramesCapture();
this.policy = WebSocketPolicy.newServerPolicy();
this.bufferPool = new MappedByteBufferPool(policy.getBufferSize());
this.bufferPool = new MappedByteBufferPool(BUFFER_SIZE);
this.parser = new Parser(policy);
this.parseCount = new AtomicInteger(0);
this.generator = new Generator(policy,bufferPool,false);
@ -242,7 +242,7 @@ public class BlockheadServer
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
try
{

View File

@ -73,6 +73,8 @@ public class WebSocketPolicy
*/
private int bufferSize = 65536;
// TODO: change bufferSize to windowSize for FrameBytes logic?
/**
* The time in ms (milliseconds) that a websocket may be idle before closing.
* <p>

View File

@ -222,7 +222,7 @@ public class WebSocketEventDriver implements IncomingFrames
}
else
{
activeMessage = new SimpleBinaryMessage(websocket,events.onBinary,session,bufferPool,policy);
activeMessage = new SimpleBinaryMessage(websocket,events.onBinary,session,policy);
}
}

View File

@ -153,6 +153,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
ByteBuffer buffer = null;
synchronized (queue)
{
LOG.debug(".flush() - flushing={} - queue.size = {}",flushing,queue.size());
if (flushing || queue.isEmpty())
{
return;
@ -167,6 +169,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return;
}
LOG.debug("Next FrameBytes: {}",frameBytes);
buffer = frameBytes.getByteBuffer();
if (buffer == null)
@ -175,15 +179,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
flushing = true;
if (LOG.isDebugEnabled())
{
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
}
}
if (connectionState != BaseConnection.State.CLOSED)
{
write(buffer,frameBytes);
if (connectionState != BaseConnection.State.CLOSED)
{
write(buffer,frameBytes);
}
}
}
@ -241,7 +246,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
@Override
public State getState()
public BaseConnection.State getState()
{
return connectionState;
}
@ -275,7 +280,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onFillable()
{
LOG.debug("{} onFillable()",policy.getBehavior());
ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false);
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer);
boolean readMore = false;
try

View File

@ -47,13 +47,16 @@ public class DataFrameBytes<C> extends FrameBytes<C>
if (frame.remaining() > 0)
{
LOG.debug("More to send");
// We have written a partial frame per windowing size.
// We need to keep the correct ordering of frames, to avoid that another
// Data frame for the same stream is written before this one is finished.
connection.getQueue().prepend(this);
connection.complete(this);
}
else
{
LOG.debug("Send complete");
super.completed(context);
}
connection.flush();
@ -64,9 +67,7 @@ public class DataFrameBytes<C> extends FrameBytes<C>
{
try
{
int windowSize = connection.getPolicy().getBufferSize();
// TODO: windowSize should adjust according to some sort of flow control rules.
int windowSize = connection.getInputBufferSize();
buffer = connection.getGenerator().generate(windowSize,frame);
return buffer;
}

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.websocket.io.WebSocketSession;
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
private static final int BUFFER_SIZE = 65535;
/**
* Threshold (of bytes) to perform compaction at
*/
@ -55,7 +56,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
this.session = session;
this.bufferPool = bufferPool;
this.policy = policy;
this.buf = bufferPool.acquire(policy.getBufferSize(),false);
this.buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(this.buf);
size = 0;
readPosition = this.buf.position();

View File

@ -18,10 +18,10 @@
package org.eclipse.jetty.websocket.io.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.EventMethod;
@ -29,24 +29,22 @@ import org.eclipse.jetty.websocket.io.WebSocketSession;
public class SimpleBinaryMessage implements MessageAppender
{
private static final int BUFFER_SIZE = 65535;
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final ByteBuffer buf;
private final ByteArrayOutputStream out;
private int size;
private boolean finished;
public SimpleBinaryMessage(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy)
public SimpleBinaryMessage(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
{
this.websocket = websocket;
this.onEvent = onEvent;
this.session = session;
this.bufferPool = bufferPool;
this.policy = policy;
this.buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(this.buf);
this.out = new ByteArrayOutputStream(BUFFER_SIZE);
finished = false;
}
@ -67,26 +65,14 @@ public class SimpleBinaryMessage implements MessageAppender
policy.assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
// TODO: grow buffer till max binary message size?
BufferUtil.put(payload,buf);
BufferUtil.writeTo(payload,out);
}
@Override
public void messageComplete()
{
BufferUtil.flipToFlush(this.buf,0);
finished = true;
try
{
// notify event
byte data[] = BufferUtil.toArray(this.buf);
this.onEvent.call(websocket,session,data,0,data.length);
}
finally
{
// release buffer (we are done with it now)
bufferPool.release(this.buf);
}
byte data[] = out.toByteArray();
this.onEvent.call(websocket,session,data,0,data.length);
}
}

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
/**
@ -59,7 +60,7 @@ public class Generator
*/
public static final int OVERHEAD = 28;
private final WebSocketPolicy policy;
private final WebSocketBehavior behavior;
private final ByteBufferPool bufferPool;
private boolean validating;
@ -95,7 +96,7 @@ public class Generator
*/
public Generator(WebSocketPolicy policy, ByteBufferPool bufferPool, boolean validating)
{
this.policy = policy;
this.behavior = policy.getBehavior();
this.bufferPool = bufferPool;
this.validating = validating;
}
@ -177,7 +178,7 @@ public class Generator
if (LOG.isDebugEnabled())
{
StringBuilder dbg = new StringBuilder();
dbg.append(policy.getBehavior());
dbg.append(behavior);
dbg.append(" Generate.Frame[");
dbg.append("opcode=").append(frame.getOpCode());
dbg.append(",fin=").append(frame.isFin());
@ -387,7 +388,7 @@ public class Generator
{
StringBuilder builder = new StringBuilder();
builder.append("Generator[");
builder.append(policy.getBehavior());
builder.append(behavior);
if (validating)
{
builder.append(",validating");

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
@ -34,8 +36,6 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class GeneratorParserRoundtripTest
{
@Test
@ -50,7 +50,7 @@ public class GeneratorParserRoundtripTest
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
ByteBuffer out = bufferPool.acquire(policy.getBufferSize(),false);
ByteBuffer out = bufferPool.acquire(8192,false);
try
{
// Generate Buffer
@ -88,7 +88,7 @@ public class GeneratorParserRoundtripTest
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
ByteBuffer out = bufferPool.acquire(policy.getBufferSize(),false);
ByteBuffer out = bufferPool.acquire(8192,false);
try
{
// Setup Frame
@ -100,7 +100,7 @@ public class GeneratorParserRoundtripTest
frame.setMask(mask);
// Generate Buffer
out = gen.generate(policy.getBufferSize(),frame);
out = gen.generate(8192,frame);
// Parse Buffer
parser.parse(out);

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.server.ab;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@ -38,8 +40,6 @@ import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
/**
* Fuzzing utility for the AB tests.
*/
@ -73,7 +73,6 @@ public class Fuzzer
int bigMessageSize = 20 * MBYTE;
policy.setBufferSize(bigMessageSize);
policy.setMaxPayloadSize(bigMessageSize);
policy.setMaxTextMessageSize(bigMessageSize);
policy.setMaxBinaryMessageSize(bigMessageSize);
@ -142,6 +141,8 @@ public class Fuzzer
prefix = "Frame[" + i + "]";
LOG.debug("{} {}",prefix,actual);
Assert.assertThat(prefix + ".opcode",OpCode.name(actual.getOpCode()),is(OpCode.name(expected.getOpCode())));
prefix += "/" + actual.getOpCode();
if (expected.getOpCode() == OpCode.CLOSE)

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.server.ab;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
@ -273,7 +274,7 @@ public class TestABCase1 extends AbstractABCase
fuzzer.setSendMode(SendMode.SLOW);
fuzzer.setSlowSendSegmentSize(segmentSize);
fuzzer.send(send);
fuzzer.expect(expect);
fuzzer.expect(expect,TimeUnit.SECONDS,2);
}
finally
{

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.websocket.server.blockhead;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
@ -39,6 +42,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.HttpsURLConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -65,12 +69,6 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.Assert;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
/**
* A simple websocket client for performing unit tests with.
* <p>
@ -85,13 +83,13 @@ import static org.junit.Assert.assertEquals;
*/
public class BlockheadClient implements IncomingFrames, OutgoingFrames
{
private static final int BUFFER_SIZE = 8192;
private static final Logger LOG = Log.getLogger(BlockheadClient.class);
/** Set to true to disable timeouts (for debugging reasons) */
private boolean debug = false;
private final URI destHttpURI;
private final URI destWebsocketURI;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final Generator generator;
private final Parser parser;
private final IncomingFramesCapture incomingFrames;
@ -126,8 +124,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
}
this.destHttpURI = new URI(scheme,destWebsocketURI.getSchemeSpecificPart(),destWebsocketURI.getFragment());
this.policy = policy;
this.bufferPool = new MappedByteBufferPool(policy.getBufferSize());
this.bufferPool = new MappedByteBufferPool(8192);
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy);
this.parseCount = new AtomicInteger(0);
@ -398,7 +395,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
try
{

View File

@ -1,17 +1,14 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
org.eclipse.jetty.server.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=WARN
org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# Disable stacks on FrameBytes.failed()
org.eclipse.jetty.websocket.io.FrameBytes.STACKS=false
# See the read/write traffic
# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG
# org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.websocket.io.FrameBytes.LEVEL=OFF
# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG