jetty-9 more robust test

This commit is contained in:
Greg Wilkins 2012-05-08 19:44:35 +02:00
parent 3c066d7a58
commit bc7e42e31f
15 changed files with 40 additions and 808 deletions

View File

@ -1,144 +0,0 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
public abstract class AbstractBuffers implements Buffers
{
protected final Buffers.Type _headerType;
protected final int _headerSize;
protected final Buffers.Type _bufferType;
protected final int _bufferSize;
protected final Buffers.Type _otherType;
/* ------------------------------------------------------------ */
public AbstractBuffers(Buffers.Type headerType, int headerSize, Buffers.Type bufferType, int bufferSize, Buffers.Type otherType)
{
_headerType=headerType;
_headerSize=headerSize;
_bufferType=bufferType;
_bufferSize=bufferSize;
_otherType=otherType;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the buffer size in bytes.
*/
public int getBufferSize()
{
return _bufferSize;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the header size in bytes.
*/
public int getHeaderSize()
{
return _headerSize;
}
/* ------------------------------------------------------------ */
/**
* Create a new header Buffer
* @return new Buffer
*/
final protected ByteBuffer newHeader()
{
switch(_headerType)
{
case DIRECT:
return BufferUtil.allocateDirect(_headerSize);
case INDIRECT:
return BufferUtil.allocate(_headerSize);
}
throw new IllegalStateException();
}
/* ------------------------------------------------------------ */
/**
* Create a new content Buffer
* @return new Buffer
*/
final protected ByteBuffer newBuffer()
{
switch(_bufferType)
{
case DIRECT:
return BufferUtil.allocateDirect(_bufferSize);
case INDIRECT:
return BufferUtil.allocate(_bufferSize);
}
throw new IllegalStateException();
}
/* ------------------------------------------------------------ */
/**
* Create a new content Buffer
* @param size
* @return new Buffer
*/
final protected ByteBuffer newBuffer(int size)
{
switch(_otherType)
{
case DIRECT:
return BufferUtil.allocateDirect(size);
case INDIRECT:
return BufferUtil.allocate(size);
}
throw new IllegalStateException();
}
/* ------------------------------------------------------------ */
/**
* @param buffer
* @return True if the buffer is the correct type to be a Header buffer
*/
public final boolean isHeader(ByteBuffer buffer)
{
if (buffer.capacity()==_headerSize)
{
switch(_headerType)
{
case DIRECT:
return buffer.isDirect();
case INDIRECT:
return !buffer.isDirect();
}
}
return false;
}
/* ------------------------------------------------------------ */
/**
* @param buffer
* @return True if the buffer is the correct type to be a Header buffer
*/
public final boolean isBuffer(ByteBuffer buffer)
{
if (buffer.capacity()==_bufferSize)
{
switch(_bufferType)
{
case DIRECT:
return buffer.isDirect();
case INDIRECT:
return !buffer.isDirect();
}
}
return false;
}
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("%s [%d,%d]", getClass().getSimpleName(), _headerSize, _bufferSize);
}
}

View File

@ -113,7 +113,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
}
}
// we are complete and ready
_writeFuture.ready();
_writeFuture.complete();
}
catch(final IOException e)
{

View File

@ -1,35 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
/* ------------------------------------------------------------ */
/** BufferSource.
* Represents a pool or other source of buffers and abstracts the creation
* of specific types of buffers (eg NIO). The concept of big and little buffers
* is supported, but these terms have no absolute meaning and must be determined by context.
*
*/
public interface Buffers
{
enum Type { DIRECT, INDIRECT }
ByteBuffer getHeader();
ByteBuffer getBuffer();
ByteBuffer getBuffer(int size);
void returnBuffer(ByteBuffer buffer);
}

View File

@ -1,11 +0,0 @@
package org.eclipse.jetty.io;
public class BuffersFactory
{
public static Buffers newBuffers(Buffers.Type headerType, int headerSize, Buffers.Type bufferType, int bufferSize, Buffers.Type otherType,int maxSize)
{
if (maxSize>=0)
return new PooledBuffers(headerType,headerSize,bufferType,bufferSize,otherType,maxSize);
return new ThreadLocalBuffers(headerType,headerSize,bufferType,bufferSize,otherType);
}
}

View File

@ -14,17 +14,17 @@ import org.eclipse.jetty.util.Callback;
/** Dispatched IOFuture.
* <p>An implementation of IOFuture that can be extended to implement the
* {@link #dispatch(Runnable)} method so that callbacks can be dispatched.
* By default, the callbacks are called by the thread that called {@link #ready()} or
* By default, the callbacks are called by the thread that called {@link #complete()} or
* {@link #fail(Throwable)}
*/
public class DispatchedIOFuture implements IOFuture
{
private final Lock _lock;
private final Condition _block;
private boolean _done;
private boolean _complete;
private boolean _ready;
private Throwable _cause;
private Callback _callback;
private Callback<?> _callback;
private Object _context;
public DispatchedIOFuture()
@ -39,8 +39,8 @@ public class DispatchedIOFuture implements IOFuture
public DispatchedIOFuture(boolean ready,Lock lock)
{
_ready=ready;
_complete=ready;
_done=ready;
_lock = lock;
_block = _lock.newCondition();
}
@ -50,11 +50,11 @@ public class DispatchedIOFuture implements IOFuture
_lock.lock();
try
{
if (_complete)
if (_done)
throw new IllegalStateException("complete",cause);
_cause=cause;
_complete=true;
_done=true;
if (_callback!=null)
dispatchFailed();
@ -66,15 +66,15 @@ public class DispatchedIOFuture implements IOFuture
}
}
public void ready()
public void complete()
{
_lock.lock();
try
{
if (_complete)
if (_done)
throw new IllegalStateException();
_ready=true;
_complete=true;
_done=true;
if (_callback!=null)
dispatchCompleted();
@ -91,10 +91,10 @@ public class DispatchedIOFuture implements IOFuture
_lock.lock();
try
{
if (_complete)
if (_done)
throw new IllegalStateException();
_ready=false;
_complete=true;
_complete=false;
_done=true;
_block.signal();
}
finally
@ -109,7 +109,7 @@ public class DispatchedIOFuture implements IOFuture
_lock.lock();
try
{
return _complete;
return _done;
}
finally
{
@ -123,9 +123,9 @@ public class DispatchedIOFuture implements IOFuture
_lock.lock();
try
{
if (_complete)
if (_done)
{
if (_ready)
if (_complete)
return true;
throw new ExecutionException(_cause);
}
@ -150,7 +150,7 @@ public class DispatchedIOFuture implements IOFuture
_lock.lock();
try
{
while (!_complete)
while (!_done)
_block.await();
isComplete();
}
@ -167,7 +167,7 @@ public class DispatchedIOFuture implements IOFuture
_lock.lock();
try
{
if (!_complete)
if (!_done)
_block.await(timeout,units);
return isComplete();
}
@ -188,9 +188,9 @@ public class DispatchedIOFuture implements IOFuture
_callback=callback;
_context=context;
if (_complete)
if (_done)
{
if (_ready)
if (_complete)
dispatchCompleted();
else
dispatchFailed();
@ -210,9 +210,7 @@ public class DispatchedIOFuture implements IOFuture
private void dispatchCompleted()
{
final Callback callback=_callback;
_callback=null;
final Object context=_context;
_context=null;
dispatch(new Runnable()
{
@Override
@ -226,11 +224,8 @@ public class DispatchedIOFuture implements IOFuture
private void dispatchFailed()
{
final Callback callback=_callback;
_callback=null;
final Throwable cause=_cause;
_cause=null;
final Object context=_context;
_context=null;
dispatch(new Runnable()
{
@Override
@ -246,7 +241,7 @@ public class DispatchedIOFuture implements IOFuture
{
return String.format("DIOF@%x{%s,%s}",
hashCode(),
_complete?(_ready?"R":_cause):"-",
_done?(_complete?"R":_cause):"-",
_callback==null?"-":_callback);
}

View File

@ -1,106 +0,0 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class PooledBuffers extends AbstractBuffers
{
private final Queue<ByteBuffer> _headers;
private final Queue<ByteBuffer> _buffers;
private final Queue<ByteBuffer> _others;
private final AtomicInteger _size = new AtomicInteger();
private final int _maxSize;
private final boolean _otherHeaders;
private final boolean _otherBuffers;
/* ------------------------------------------------------------ */
public PooledBuffers(Buffers.Type headerType, int headerSize, Buffers.Type bufferType, int bufferSize, Buffers.Type otherType,int maxSize)
{
super(headerType,headerSize,bufferType,bufferSize,otherType);
_headers=new ConcurrentLinkedQueue<ByteBuffer>();
_buffers=new ConcurrentLinkedQueue<ByteBuffer>();
_others=new ConcurrentLinkedQueue<ByteBuffer>();
_otherHeaders=headerType==otherType;
_otherBuffers=bufferType==otherType;
_maxSize=maxSize;
}
/* ------------------------------------------------------------ */
public ByteBuffer getHeader()
{
ByteBuffer buffer = _headers.poll();
if (buffer==null)
buffer=newHeader();
else
_size.decrementAndGet();
return buffer;
}
/* ------------------------------------------------------------ */
public ByteBuffer getBuffer()
{
ByteBuffer buffer = _buffers.poll();
if (buffer==null)
buffer=newBuffer();
else
_size.decrementAndGet();
return buffer;
}
/* ------------------------------------------------------------ */
public ByteBuffer getBuffer(int size)
{
if (_otherHeaders && size==getHeaderSize())
return getHeader();
if (_otherBuffers && size==getBufferSize())
return getBuffer();
// Look for an other buffer
ByteBuffer buffer = _others.poll();
// consume all other buffers until one of the right size is found
while (buffer!=null && buffer.capacity()!=size)
{
_size.decrementAndGet();
buffer = _others.poll();
}
if (buffer==null)
buffer=newBuffer(size);
else
_size.decrementAndGet();
return buffer;
}
/* ------------------------------------------------------------ */
public void returnBuffer(ByteBuffer buffer)
{
buffer.clear().limit(0);
if (buffer.isReadOnly())
return;
if (_size.incrementAndGet() > _maxSize)
_size.decrementAndGet();
else
{
if (isHeader(buffer))
_headers.add(buffer);
else if (isBuffer(buffer))
_buffers.add(buffer);
else
_others.add(buffer);
}
}
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("%s [%d/%d@%d,%d/%d@%d,%d/%d@-]",
getClass().getSimpleName(),
_headers.size(),_maxSize,_headerSize,
_buffers.size(),_maxSize,_bufferSize,
_others.size(),_maxSize);
}
}

View File

@ -128,7 +128,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_interestOps=0;
if (can_read && !_readFuture.isDone())
_readFuture.ready();
_readFuture.complete();
if (can_write && _writeBuffers!=null)
completeWrite();
@ -290,7 +290,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
}
// we are complete and ready
_writeFuture.ready();
_writeFuture.complete();
}
catch(final IOException e)
{

View File

@ -1,114 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
/* ------------------------------------------------------------ */
/** SimpleBuffers.
* Simple implementation of Buffers holder.
*
*
*/
public class SimpleBuffers implements Buffers
{
final ByteBuffer _header;
final ByteBuffer _buffer;
boolean _headerOut;
boolean _bufferOut;
/* ------------------------------------------------------------ */
/**
*
*/
public SimpleBuffers(ByteBuffer header, ByteBuffer buffer)
{
_header=header;
_buffer=buffer;
}
/* ------------------------------------------------------------ */
public ByteBuffer getBuffer()
{
synchronized(this)
{
if (_buffer!=null && !_bufferOut)
{
_bufferOut=true;
return _buffer;
}
if (_buffer!=null && _header!=null && _header.capacity()==_buffer.capacity() && !_headerOut)
{
_headerOut=true;
return _header;
}
if (_buffer!=null)
return ByteBuffer.allocate(_buffer.capacity());
return ByteBuffer.allocate(4096);
}
}
/* ------------------------------------------------------------ */
public ByteBuffer getHeader()
{
synchronized(this)
{
if (_header!=null && !_headerOut)
{
_headerOut=true;
return _header;
}
if (_buffer!=null && _header!=null && _header.capacity()==_buffer.capacity() && !_bufferOut)
{
_bufferOut=true;
return _buffer;
}
if (_header!=null)
return ByteBuffer.allocate(_header.capacity());
return ByteBuffer.allocate(4096);
}
}
/* ------------------------------------------------------------ */
public ByteBuffer getBuffer(int size)
{
synchronized(this)
{
if (_header!=null && _header.capacity()==size)
return getHeader();
if (_buffer!=null && _buffer.capacity()==size)
return getBuffer();
return null;
}
}
/* ------------------------------------------------------------ */
public void returnBuffer(ByteBuffer buffer)
{
synchronized(this)
{
buffer.clear().limit(0);
if (buffer==_header)
_headerOut=false;
if (buffer==_buffer)
_bufferOut=false;
}
}
}

View File

@ -530,7 +530,7 @@ public class SslConnection extends AbstractAsyncConnection
// If any bytes were produced and we have an app read waiting, make it ready.
if (result.bytesProduced()>0 && !_appReadFuture.isDone())
_appReadFuture.ready();
_appReadFuture.complete();
return result.bytesConsumed()>0 || result.bytesProduced()>0;
}
@ -805,7 +805,7 @@ public class SslConnection extends AbstractAsyncConnection
if (b.hasRemaining())
return;
}
_appWriteFuture.ready();
_appWriteFuture.complete();
}
catch (IOException e)
{

View File

@ -1,132 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
/* ------------------------------------------------------------ */
/** Abstract Buffer pool.
* simple unbounded pool of buffers for header, request and response sizes.
*
*/
public class ThreadLocalBuffers extends AbstractBuffers
{
/* ------------------------------------------------------------ */
private final ThreadLocal<ThreadBuffers> _buffers=new ThreadLocal<ThreadBuffers>()
{
@Override
protected ThreadBuffers initialValue()
{
return new ThreadBuffers();
}
};
/* ------------------------------------------------------------ */
public ThreadLocalBuffers(Buffers.Type headerType, int headerSize, Buffers.Type bufferType, int bufferSize, Buffers.Type otherType)
{
super(headerType,headerSize,bufferType,bufferSize,otherType);
}
/* ------------------------------------------------------------ */
public ByteBuffer getBuffer()
{
ThreadBuffers buffers = _buffers.get();
if (buffers._buffer!=null)
{
ByteBuffer b=buffers._buffer;
buffers._buffer=null;
return b;
}
if (buffers._other!=null && isBuffer(buffers._other))
{
ByteBuffer b=buffers._other;
buffers._other=null;
return b;
}
return newBuffer();
}
/* ------------------------------------------------------------ */
public ByteBuffer getHeader()
{
ThreadBuffers buffers = _buffers.get();
if (buffers._header!=null)
{
ByteBuffer b=buffers._header;
buffers._header=null;
return b;
}
if (buffers._other!=null && isHeader(buffers._other))
{
ByteBuffer b=buffers._other;
buffers._other=null;
return b;
}
return newHeader();
}
/* ------------------------------------------------------------ */
public ByteBuffer getBuffer(int size)
{
ThreadBuffers buffers = _buffers.get();
if (buffers._other!=null && buffers._other.capacity()==size)
{
ByteBuffer b=buffers._other;
buffers._other=null;
return b;
}
return newBuffer(size);
}
/* ------------------------------------------------------------ */
public void returnBuffer(ByteBuffer buffer)
{
buffer.clear().limit(0);
if (buffer.isReadOnly())
return;
ThreadBuffers buffers = _buffers.get();
if (buffers._header==null && isHeader(buffer))
buffers._header=buffer;
else if (buffers._buffer==null && isBuffer(buffer))
buffers._buffer=buffer;
else
buffers._other=buffer;
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return "{{"+getHeaderSize()+","+getBufferSize()+"}}";
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
protected static class ThreadBuffers
{
ByteBuffer _buffer;
ByteBuffer _header;
ByteBuffer _other;
}
}

View File

@ -189,8 +189,8 @@ public class IOFutureTest
}, null);
long start=System.currentTimeMillis();
assertFalse(future.block(10,TimeUnit.MILLISECONDS));
assertThat(System.currentTimeMillis()-start,greaterThan(9L));
assertFalse(future.block(100,TimeUnit.MILLISECONDS));
assertThat(System.currentTimeMillis()-start,greaterThan(10L));
assertFalse(ready.get());
assertNull(fail.get());
@ -203,13 +203,13 @@ public class IOFutureTest
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){e.printStackTrace();}
f0.ready();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f0.complete();
}
}.start();
assertTrue(future.block(1000,TimeUnit.MILLISECONDS));
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(49L));
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,lessThan(1000L));
assertTrue(future.isDone());
@ -230,13 +230,13 @@ public class IOFutureTest
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){e.printStackTrace();}
f1.ready();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f1.complete();
}
}.start();
future.block();
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(49L));
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
assertTrue(future.isDone());
assertTrue(future.isComplete());
@ -273,8 +273,8 @@ public class IOFutureTest
}, null);
long start=System.currentTimeMillis();
assertFalse(future.block(10,TimeUnit.MILLISECONDS));
assertThat(System.currentTimeMillis()-start,greaterThan(9L));
assertFalse(future.block(100,TimeUnit.MILLISECONDS));
assertThat(System.currentTimeMillis()-start,greaterThan(10L));
assertFalse(ready.get());
assertNull(fail.get());
@ -286,7 +286,7 @@ public class IOFutureTest
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){e.printStackTrace();}
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f0.fail(ex);
}
}.start();
@ -300,7 +300,7 @@ public class IOFutureTest
{
Assert.assertEquals(ex,e.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(49L));
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,lessThan(1000L));
assertTrue(future.isDone());
@ -329,7 +329,7 @@ public class IOFutureTest
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){e.printStackTrace();}
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f1.fail(ex);
}
}.start();
@ -343,7 +343,7 @@ public class IOFutureTest
{
Assert.assertEquals(ex,e.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(49L));
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
assertTrue(future.isDone());
try

View File

@ -171,7 +171,6 @@ public class SelectChannelEndPointTest
// Timeout does not close, so echo exception then shutdown
try
{
System.err.println(e);
_endp.write(BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))).block();
_endp.shutdownOutput();
}

View File

@ -1,219 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.toolchain.test.Stress;
import org.junit.Test;
public class ThreadLocalBuffersTest
{
private Buffers httpBuffers;
private List<Thread> threadList = new ArrayList<Thread>();
private int numThreads = Stress.isEnabled()?100:10;
private int runTestLength = Stress.isEnabled()?5000:1000;
private boolean runTest = false;
private AtomicLong buffersRetrieved;
private void execAbstractBuffer() throws Exception
{
threadList.clear();
buffersRetrieved = new AtomicLong( 0 );
httpBuffers = new InnerBuffers(1024,4096);
for ( int i = 0; i < numThreads; ++i )
{
threadList.add( new BufferPeeper( "BufferPeeper: " + i ) );
}
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
runTest = true;
Thread.sleep( runTestLength );
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
runTest = false;
long totalBuffersRetrieved = buffersRetrieved.get();
System.out.println( "Buffers Retrieved: " + totalBuffersRetrieved );
System.out.println( "Memory Used: " + ( mem1 - mem0 ) );
for (Thread t : threadList)
t.stop();
}
@Test
public void testAbstractBuffers() throws Exception
{
execAbstractBuffer( );
}
@Test
public void testDifferentSizes() throws Exception
{
InnerBuffers buffers = new InnerBuffers(128,256);
ByteBuffer h1 = buffers.getHeader();
ByteBuffer h2 = buffers.getHeader();
ByteBuffer b1 = buffers.getBuffer();
ByteBuffer b2 = buffers.getBuffer();
ByteBuffer b3 = buffers.getBuffer(512);
buffers.returnBuffer(h1);
buffers.returnBuffer(h2);
buffers.returnBuffer(b1);
buffers.returnBuffer(b2);
buffers.returnBuffer(b3);
assertTrue(h1==buffers.getHeader()); // pooled header
assertTrue(h2!=buffers.getHeader()); // b2 replaced h2 in other slot
assertTrue(b1==buffers.getBuffer()); // pooled buffer
assertTrue(b2!=buffers.getBuffer()); // b3 replaced b2 in other slot
assertTrue(b3==buffers.getBuffer(512)); // b2 from other slot
buffers.returnBuffer(h1);
buffers.returnBuffer(h2);
buffers.returnBuffer(b1);
assertTrue(h1==buffers.getHeader()); // pooled header
assertTrue(h2==buffers.getHeader()); // h2 in other slot
assertTrue(b1==buffers.getBuffer()); // pooled buffer
assertTrue(b2!=buffers.getBuffer()); // new buffer
assertTrue(b3!=buffers.getBuffer(512)); // new buffer
// check that sizes are respected
buffers.returnBuffer(b3);
buffers.returnBuffer(b1);
buffers.returnBuffer(b2);
buffers.returnBuffer(h1);
buffers.returnBuffer(h2);
assertTrue(h1==buffers.getHeader()); // pooled header
assertTrue(h2==buffers.getHeader()); // h2 in other slot
assertTrue(b1==buffers.getBuffer()); // pooled buffer
assertTrue(b2!=buffers.getBuffer()); // new buffer
assertTrue(b3!=buffers.getBuffer(512)); // new buffer
}
@Test
public void testSameSizes() throws Exception
{
InnerBuffers buffers = new InnerBuffers(128,128);
ByteBuffer h1 = buffers.getHeader();
ByteBuffer h2 = buffers.getHeader();
ByteBuffer b1 = buffers.getBuffer();
ByteBuffer b2 = buffers.getBuffer();
ByteBuffer b3 = buffers.getBuffer(128);
List<ByteBuffer> known = new ArrayList<ByteBuffer>();
known.add(h1);
known.add(h2);
known.add(b1);
known.add(b2);
known.add(b3);
buffers.returnBuffer(h1); // header slot *
buffers.returnBuffer(h2); // other slot
buffers.returnBuffer(b1); // buffer slot *
buffers.returnBuffer(b2); // other slot
buffers.returnBuffer(b3); // other slot *
assertTrue(h1==buffers.getHeader()); // pooled header
assertTrue(b3==buffers.getHeader()); // pooled other
ByteBuffer buffer = buffers.getHeader();
for (ByteBuffer b:known) assertTrue(b!=buffer); // new buffer
assertTrue(b1==buffers.getBuffer()); // b1 used from buffer slot
buffer = buffers.getBuffer();
for (ByteBuffer b:known) assertTrue(b!=buffer); // new buffer
buffer = buffers.getBuffer(128);
for (ByteBuffer b:known) assertTrue(b!=buffer); // new buffer
}
private static class InnerBuffers extends ThreadLocalBuffers
{
InnerBuffers(int headerSize,int bufferSize)
{
super(Type.INDIRECT,headerSize,Type.DIRECT,bufferSize,Type.INDIRECT);
}
}
private class BufferPeeper extends Thread
{
private final String _bufferName;
public BufferPeeper( String bufferName )
{
_bufferName = bufferName;
start();
}
@Override
public void run()
{
while ( true )
{
try
{
if ( runTest )
{
ByteBuffer buf = httpBuffers.getHeader();
buffersRetrieved.getAndIncrement();
buf.compact().put(new Byte("2")).flip();
// sleep( threadWaitTime );
httpBuffers.returnBuffer(buf);
}
else
{
sleep( 1 );
}
}
catch ( Exception e )
{
e.printStackTrace();
break;
}
}
}
}
}

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.AggregateLifeCycle;

View File

@ -440,7 +440,7 @@ public class ConnectHandler extends HandlerWrapper
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
proxyToServer.ready();
proxyToServer.complete();
}
@Override