Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-09-28 11:53:54 +02:00
commit 7e376fd9da
14 changed files with 574 additions and 1197 deletions

View File

@ -19,10 +19,10 @@
package org.eclipse.jetty.fcgi.generator; package org.eclipse.jetty.fcgi.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -31,7 +31,7 @@ public class Flusher
{ {
private static final Logger LOG = Log.getLogger(Flusher.class); private static final Logger LOG = Log.getLogger(Flusher.class);
private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>(); private final Queue<Generator.Result> queue = new ArrayDeque<>();
private final IteratingCallback flushCallback = new FlushCallback(); private final IteratingCallback flushCallback = new FlushCallback();
private final EndPoint endPoint; private final EndPoint endPoint;
@ -43,10 +43,26 @@ public class Flusher
public void flush(Generator.Result... results) public void flush(Generator.Result... results)
{ {
for (Generator.Result result : results) for (Generator.Result result : results)
queue.offer(result); offer(result);
flushCallback.iterate(); flushCallback.iterate();
} }
private void offer(Generator.Result result)
{
synchronized (this)
{
queue.offer(result);
}
}
private Generator.Result poll()
{
synchronized (this)
{
return queue.poll();
}
}
public void shutdown() public void shutdown()
{ {
flush(new ShutdownResult()); flush(new ShutdownResult());
@ -60,7 +76,7 @@ public class Flusher
protected Action process() throws Exception protected Action process() throws Exception
{ {
// Look if other writes are needed. // Look if other writes are needed.
Generator.Result result = queue.poll(); Generator.Result result = poll();
if (result == null) if (result == null)
{ {
// No more writes to do, return. // No more writes to do, return.
@ -71,7 +87,7 @@ public class Flusher
// Most often there is another result in the // Most often there is another result in the
// queue so this is a real optimization because // queue so this is a real optimization because
// it sends both results in just one TCP packet. // it sends both results in just one TCP packet.
Generator.Result other = queue.poll(); Generator.Result other = poll();
if (other != null) if (other != null)
result = result.join(other); result = result.join(other);
@ -106,7 +122,7 @@ public class Flusher
while (true) while (true)
{ {
Generator.Result result = queue.poll(); Generator.Result result = poll();
if (result == null) if (result == null)
break; break;
result.failed(x); result.failed(x);

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -30,7 +31,6 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ -42,7 +42,7 @@ public class HTTP2Connection extends AbstractConnection
{ {
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>(); private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer(); private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong(); private final AtomicLong bytesIn = new AtomicLong();
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
@ -157,7 +157,7 @@ public class HTTP2Connection extends AbstractConnection
protected void offerTask(Runnable task, boolean dispatch) protected void offerTask(Runnable task, boolean dispatch)
{ {
tasks.offer(task); offerTask(task);
// Because producing calls parse and parse can call offerTask, we have to make sure // Because producing calls parse and parse can call offerTask, we have to make sure
// we use the same strategy otherwise produce can be reentrant and that messes with // we use the same strategy otherwise produce can be reentrant and that messes with
@ -178,6 +178,22 @@ public class HTTP2Connection extends AbstractConnection
session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
} }
private void offerTask(Runnable task)
{
synchronized (this)
{
tasks.offer(task);
}
}
private Runnable pollTask()
{
synchronized (this)
{
return tasks.poll();
}
}
protected class HTTP2Producer implements ExecutionStrategy.Producer protected class HTTP2Producer implements ExecutionStrategy.Producer
{ {
private final Callback fillableCallback = new FillableCallback(); private final Callback fillableCallback = new FillableCallback();
@ -186,7 +202,7 @@ public class HTTP2Connection extends AbstractConnection
@Override @Override
public synchronized Runnable produce() public synchronized Runnable produce()
{ {
Runnable task = tasks.poll(); Runnable task = pollTask();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Dequeued task {}", task); LOG.debug("Dequeued task {}", task);
if (task != null) if (task != null)
@ -205,7 +221,7 @@ public class HTTP2Connection extends AbstractConnection
while (buffer.hasRemaining()) while (buffer.hasRemaining())
parser.parse(buffer); parser.parse(buffer);
task = tasks.poll(); task = pollTask();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Dequeued new task {}", task); LOG.debug("Dequeued new task {}", task);
if (task != null) if (task != null)

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.io.Closeable; import java.io.Closeable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
@ -54,7 +55,6 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
@ -83,7 +83,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
} }
} }
private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>(); private final Queue<HttpChannelOverHTTP2> channels = new ArrayDeque<>();
private final List<Frame> upgradeFrames = new ArrayList<>(); private final List<Frame> upgradeFrames = new ArrayList<>();
private final AtomicLong totalRequests = new AtomicLong(); private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong totalResponses = new AtomicLong(); private final AtomicLong totalResponses = new AtomicLong();
@ -216,7 +216,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream) private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
{ {
HttpChannelOverHTTP2 channel = channels.poll(); HttpChannelOverHTTP2 channel = pollChannel();
if (channel != null) if (channel != null)
{ {
channel.getHttpTransport().setStream(stream); channel.getHttpTransport().setStream(stream);
@ -235,6 +235,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return channel; return channel;
} }
private void offerChannel(HttpChannelOverHTTP2 channel)
{
synchronized (this)
{
channels.offer(channel);
}
}
private HttpChannelOverHTTP2 pollChannel()
{
synchronized (this)
{
return channels.poll();
}
}
public boolean upgrade(Request request) public boolean upgrade(Request request)
{ {
if (HttpMethod.PRI.is(request.getMethod())) if (HttpMethod.PRI.is(request.getMethod()))
@ -297,7 +313,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
{ {
getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE); getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE);
super.recycle(); super.recycle();
channels.offer(this); offerChannel(this);
} }
@Override @Override

View File

@ -19,13 +19,15 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
/** /**
* <p>A {@link ByteBuffer} pool.</p> * <p>A {@link ByteBuffer} pool.</p>
@ -55,7 +57,7 @@ public interface ByteBufferPool
* @see #acquire(int, boolean) * @see #acquire(int, boolean)
*/ */
public void release(ByteBuffer buffer); public void release(ByteBuffer buffer);
default ByteBuffer newByteBuffer(int capacity, boolean direct) default ByteBuffer newByteBuffer(int capacity, boolean direct)
{ {
return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity); return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
@ -124,73 +126,133 @@ public interface ByteBufferPool
} }
} }
class Bucket class Bucket
{ {
private final Lock _lock = new ReentrantLock();
private final Queue<ByteBuffer> _queue = new ArrayDeque<>();
private final ByteBufferPool _pool; private final ByteBufferPool _pool;
private final int _capacity; private final int _capacity;
private final AtomicInteger _space; private final AtomicInteger _space;
private final Queue<ByteBuffer> _queue= new ConcurrentArrayQueue<>();
public Bucket(ByteBufferPool pool, int bufferSize,int maxSize) public Bucket(ByteBufferPool pool, int bufferSize, int maxSize)
{ {
_pool=pool; _pool = pool;
_capacity=bufferSize; _capacity = bufferSize;
_space=maxSize>0?new AtomicInteger(maxSize):null; _space = maxSize > 0 ? new AtomicInteger(maxSize) : null;
} }
public ByteBuffer acquire(boolean direct)
{
ByteBuffer buffer = queuePoll();
if (buffer == null)
return _pool.newByteBuffer(_capacity, direct);
if (_space != null)
_space.incrementAndGet();
return buffer;
}
public void release(ByteBuffer buffer) public void release(ByteBuffer buffer)
{ {
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
if (_space==null) if (_space == null)
_queue.offer(buffer); queueOffer(buffer);
else if (_space.decrementAndGet()>=0) else if (_space.decrementAndGet() >= 0)
_queue.offer(buffer); queueOffer(buffer);
else else
_space.incrementAndGet(); _space.incrementAndGet();
} }
public ByteBuffer acquire(boolean direct)
{
ByteBuffer buffer = _queue.poll();
if (buffer == null)
return _pool.newByteBuffer(_capacity,direct);
if (_space!=null)
_space.incrementAndGet();
return buffer;
}
public void clear() public void clear()
{ {
if (_space==null) if (_space == null)
_queue.clear(); {
queueClear();
}
else else
{ {
int s=_space.getAndSet(0); int s = _space.getAndSet(0);
while(s-->0) while (s-- > 0)
{ {
if (_queue.poll()==null) if (queuePoll() == null)
_space.incrementAndGet(); _space.incrementAndGet();
} }
} }
} }
private void queueOffer(ByteBuffer buffer)
{
Lock lock = _lock;
lock.lock();
try
{
_queue.offer(buffer);
}
finally
{
lock.unlock();
}
}
private ByteBuffer queuePoll()
{
Lock lock = _lock;
lock.lock();
try
{
return _queue.poll();
}
finally
{
lock.unlock();
}
}
private void queueClear()
{
Lock lock = _lock;
lock.lock();
try
{
_queue.clear();
}
finally
{
lock.unlock();
}
}
boolean isEmpty() boolean isEmpty()
{ {
return _queue.isEmpty(); Lock lock = _lock;
lock.lock();
try
{
return _queue.isEmpty();
}
finally
{
lock.unlock();
}
} }
int size() int size()
{ {
return _queue.size(); Lock lock = _lock;
lock.lock();
try
{
return _queue.size();
}
finally
{
lock.unlock();
}
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("Bucket@%x{%d,%d}",hashCode(),_capacity,_queue.size()); return String.format("Bucket@%x{%d/%d}", hashCode(), size(), _capacity);
} }
} }
} }

View File

@ -18,93 +18,76 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class HttpInputTest public class HttpInputTest
{ {
Queue<String> _history = new ConcurrentArrayQueue<String>() private final Queue<String> _history = new LinkedBlockingQueue<>();
{ private final Queue<String> _fillAndParseSimulate = new LinkedBlockingQueue<>();
@Override private final ReadListener _listener = new ReadListener()
public boolean add(String s)
{
//System.err.println("history: "+s);
return super.add(s);
}
};
Queue<String> _fillAndParseSimulate = new ConcurrentArrayQueue<>();
HttpInput _in;
ReadListener _listener = new ReadListener()
{ {
@Override @Override
public void onError(Throwable t) public void onError(Throwable t)
{ {
_history.add("onError:"+t); _history.add("onError:" + t);
} }
@Override @Override
public void onDataAvailable() throws IOException public void onDataAvailable() throws IOException
{ {
_history.add("onDataAvailable"); _history.add("onDataAvailable");
} }
@Override @Override
public void onAllDataRead() throws IOException public void onAllDataRead() throws IOException
{ {
_history.add("onAllDataRead"); _history.add("onAllDataRead");
} }
}; };
private HttpInput _in;
public class TContent extends HttpInput.Content public class TContent extends HttpInput.Content
{ {
private final String _content; private final String _content;
public TContent(String content) public TContent(String content)
{ {
super(BufferUtil.toBuffer(content)); super(BufferUtil.toBuffer(content));
_content=content; _content = content;
} }
@Override @Override
public void succeeded() public void succeeded()
{ {
_history.add("Content succeeded "+_content); _history.add("Content succeeded " + _content);
super.succeeded(); super.succeeded();
} }
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
_history.add("Content failed "+_content); _history.add("Content failed " + _content);
super.failed(x); super.failed(x);
} }
} }
@Before @Before
public void before() public void before()
{ {
_in=new HttpInput(new HttpChannelState(new HttpChannel(null,new HttpConfiguration(),null,null) _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null)
{ {
@Override @Override
public void asyncReadFillInterested() public void asyncReadFillInterested()
@ -113,7 +96,6 @@ public class HttpInputTest
} }
}) })
{ {
@Override @Override
public void onReadUnready() public void onReadUnready()
{ {
@ -127,7 +109,7 @@ public class HttpInputTest
_history.add("onReadPossible"); _history.add("onReadPossible");
return super.onReadPossible(); return super.onReadPossible();
} }
@Override @Override
public boolean onReadReady() public boolean onReadReady()
{ {
@ -139,9 +121,9 @@ public class HttpInputTest
@Override @Override
protected void produceContent() throws IOException protected void produceContent() throws IOException
{ {
_history.add("produceContent "+_fillAndParseSimulate.size()); _history.add("produceContent " + _fillAndParseSimulate.size());
for (String s=_fillAndParseSimulate.poll();s!=null;s=_fillAndParseSimulate.poll()) for (String s = _fillAndParseSimulate.poll(); s != null; s = _fillAndParseSimulate.poll())
{ {
if ("_EOF_".equals(s)) if ("_EOF_".equals(s))
_in.eof(); _in.eof();
@ -153,28 +135,28 @@ public class HttpInputTest
@Override @Override
protected void blockForContent() throws IOException protected void blockForContent() throws IOException
{ {
_history.add("blockForContent"); _history.add("blockForContent");
super.blockForContent(); super.blockForContent();
} }
}; };
} }
@After @After
public void after() public void after()
{ {
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testEmpty() throws Exception public void testEmpty() throws Exception
{ {
assertThat(_in.available(),equalTo(0)); Assert.assertThat(_in.available(), Matchers.equalTo(0));
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
@ -184,44 +166,43 @@ public class HttpInputTest
_in.addContent(new TContent("CD")); _in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF"); _fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH"); _fillAndParseSimulate.offer("GH");
assertThat(_in.available(),equalTo(2)); Assert.assertThat(_in.available(), Matchers.equalTo(2));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_in.getContentConsumed(),equalTo(0L)); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertThat(_in.getContentConsumed(),equalTo(1L)); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
assertThat(_in.read(),equalTo((int)'B')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
assertThat(_in.getContentConsumed(),equalTo(2L)); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'E'));
assertThat(_in.read(),equalTo((int)'F'));
assertThat(_history.poll(),equalTo("produceContent 2")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded EF")); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_history.poll(),nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
assertThat(_in.read(),equalTo((int)'G')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
assertThat(_in.read(),equalTo((int)'H'));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
assertThat(_history.poll(),equalTo("Content succeeded GH")); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_history.poll(),nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'E'));
assertThat(_in.getContentConsumed(),equalTo(8L)); Assert.assertThat(_in.read(), Matchers.equalTo((int)'F'));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2"));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'G'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'H'));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testReRead() throws Exception public void testReRead() throws Exception
{ {
@ -229,88 +210,86 @@ public class HttpInputTest
_in.addContent(new TContent("CD")); _in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF"); _fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH"); _fillAndParseSimulate.offer("GH");
assertThat(_in.available(),equalTo(2)); Assert.assertThat(_in.available(), Matchers.equalTo(2));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'E'));
assertThat(_in.getContentConsumed(),equalTo(0L));
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.getContentConsumed(),equalTo(1L));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_in.getContentConsumed(),equalTo(2L));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'E'));
_in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde"))); _in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde")));
assertThat(_in.available(),equalTo(5)); Assert.assertThat(_in.available(), Matchers.equalTo(5));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_in.getContentConsumed(),equalTo(0L)); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
assertThat(_in.read(),equalTo((int)'a')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'a'));
assertThat(_in.getContentConsumed(),equalTo(1L)); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
assertThat(_in.read(),equalTo((int)'b')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'b'));
assertThat(_in.getContentConsumed(),equalTo(2L)); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
assertThat(_in.read(),equalTo((int)'c')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'c'));
assertThat(_in.read(),equalTo((int)'d')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'d'));
assertThat(_in.read(),equalTo((int)'e')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'e'));
assertThat(_in.read(),equalTo((int)'F'));
assertThat(_history.poll(),equalTo("produceContent 2")); Assert.assertThat(_in.read(), Matchers.equalTo((int)'F'));
assertThat(_history.poll(),equalTo("Content succeeded EF"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2"));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF"));
assertThat(_in.read(),equalTo((int)'G')); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.read(),equalTo((int)'H'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'G'));
assertThat(_history.poll(),equalTo("Content succeeded GH")); Assert.assertThat(_in.read(), Matchers.equalTo((int)'H'));
assertThat(_history.poll(),nullValue());
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH"));
assertThat(_in.getContentConsumed(),equalTo(8L)); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_history.poll(),nullValue()); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testBlockingRead() throws Exception public void testBlockingRead() throws Exception
{ {
new Thread() new Thread()
{ {
public void run() public void run()
{ {
try try
{ {
Thread.sleep(500); Thread.sleep(500);
_in.addContent(new TContent("AB")); _in.addContent(new TContent("AB"));
} }
catch(Throwable th) catch (Throwable th)
{ {
th.printStackTrace(); th.printStackTrace();
} }
} }
}.start(); }.start();
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("blockForContent")); Assert.assertThat(_history.poll(), Matchers.equalTo("blockForContent"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.read(),equalTo((int)'B')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
@ -320,26 +299,26 @@ public class HttpInputTest
_in.addContent(new TContent("CD")); _in.addContent(new TContent("CD"));
_in.eof(); _in.eof();
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.available(),equalTo(2)); Assert.assertThat(_in.available(), Matchers.equalTo(2));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertThat(_in.read(),equalTo((int)'B')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.read(),equalTo((int)'C')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.read(),equalTo((int)'D')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.read(),equalTo(-1)); Assert.assertThat(_in.read(), Matchers.equalTo(-1));
assertThat(_in.isFinished(),equalTo(true)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
@ -349,256 +328,251 @@ public class HttpInputTest
_in.addContent(new TContent("CD")); _in.addContent(new TContent("CD"));
_in.earlyEOF(); _in.earlyEOF();
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.available(),equalTo(2)); Assert.assertThat(_in.available(), Matchers.equalTo(2));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo((int)'D'));
try try
{ {
_in.read(); _in.read();
fail(); Assert.fail();
} }
catch(EOFException eof) catch (EOFException eof)
{ {
assertThat(_in.isFinished(),equalTo(true)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
} }
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded CD")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testBlockingEOF() throws Exception public void testBlockingEOF() throws Exception
{ {
new Thread() new Thread()
{ {
public void run() public void run()
{ {
try try
{ {
Thread.sleep(500); Thread.sleep(500);
_in.eof(); _in.eof();
} }
catch(Throwable th) catch (Throwable th)
{ {
th.printStackTrace(); th.printStackTrace();
} }
} }
}.start(); }.start();
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.read(),equalTo(-1)); Assert.assertThat(_in.read(), Matchers.equalTo(-1));
assertThat(_in.isFinished(),equalTo(true)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("blockForContent")); Assert.assertThat(_history.poll(), Matchers.equalTo("blockForContent"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testAsyncEmpty() throws Exception public void testAsyncEmpty() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false)); _in.run();
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_history.poll(),nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
assertThat(_in.isReady(),equalTo(false)); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_history.poll(),nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testAsyncRead() throws Exception public void testAsyncRead() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.addContent(new TContent("AB")); _in.addContent(new TContent("AB"));
_fillAndParseSimulate.add("CD"); _fillAndParseSimulate.add("CD");
assertThat(_history.poll(),equalTo("onReadPossible")); Assert.assertThat(_history.poll(), Matchers.equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C')); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testAsyncEOF() throws Exception public void testAsyncEOF() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.eof(); _in.eof();
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_history.poll(),equalTo("onReadPossible")); Assert.assertThat(_history.poll(), Matchers.equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.read(),equalTo(-1)); Assert.assertThat(_in.read(), Matchers.equalTo(-1));
assertThat(_in.isFinished(),equalTo(true)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
assertThat(_history.poll(),equalTo("ready")); Assert.assertThat(_history.poll(), Matchers.equalTo("ready"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testAsyncReadEOF() throws Exception public void testAsyncReadEOF() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.addContent(new TContent("AB")); _in.addContent(new TContent("AB"));
_fillAndParseSimulate.add("_EOF_"); _fillAndParseSimulate.add("_EOF_");
assertThat(_history.poll(),equalTo("onReadPossible")); Assert.assertThat(_history.poll(), Matchers.equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_in.isReady(),equalTo(true)); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isFinished(),equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
assertThat(_in.read(),equalTo(-1)); Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
assertThat(_in.isFinished(),equalTo(true)); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("ready")); Assert.assertThat(_history.poll(), Matchers.equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),nullValue());
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.read(), Matchers.equalTo(-1));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
Assert.assertThat(_history.poll(), Matchers.equalTo("ready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testAsyncError() throws Exception public void testAsyncError() throws Exception
{ {
_in.setReadListener(_listener); _in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready")); Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run(); _in.run();
assertThat(_history.poll(),equalTo("onDataAvailable")); Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.failed(new TimeoutException()); _in.failed(new TimeoutException());
assertThat(_history.poll(),equalTo("onReadPossible")); Assert.assertThat(_history.poll(), Matchers.equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("onError:java.util.concurrent.TimeoutException"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true)); _in.run();
Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
Assert.assertThat(_history.poll(), Matchers.equalTo("onError:java.util.concurrent.TimeoutException"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
try try
{ {
_in.read(); _in.read();
fail(); Assert.fail();
} }
catch(IOException e) catch (IOException e)
{ {
assertThat(e.getCause(),Matchers.instanceOf(TimeoutException.class)); Assert.assertThat(e.getCause(), Matchers.instanceOf(TimeoutException.class));
assertThat(_in.isFinished(),equalTo(true)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
} }
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testRecycle() throws Exception public void testRecycle() throws Exception
@ -609,7 +583,7 @@ public class HttpInputTest
_in.recycle(); _in.recycle();
testReadEOF(); testReadEOF();
} }
@Test @Test
public void testConsumeAll() throws Exception public void testConsumeAll() throws Exception
{ {
@ -617,20 +591,20 @@ public class HttpInputTest
_in.addContent(new TContent("CD")); _in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF"); _fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH"); _fillAndParseSimulate.offer("GH");
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertFalse(_in.consumeAll());
assertThat(_in.getContentConsumed(),equalTo(8L));
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertFalse(_in.consumeAll());
assertThat(_history.poll(),equalTo("Content succeeded CD")); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L));
assertThat(_history.poll(),equalTo("produceContent 2"));
assertThat(_history.poll(),equalTo("Content succeeded EF")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded GH")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
assertThat(_history.poll(),equalTo("produceContent 0")); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF"));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH"));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test @Test
public void testConsumeAllEOF() throws Exception public void testConsumeAllEOF() throws Exception
{ {
@ -639,16 +613,16 @@ public class HttpInputTest
_fillAndParseSimulate.offer("EF"); _fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH"); _fillAndParseSimulate.offer("GH");
_fillAndParseSimulate.offer("_EOF_"); _fillAndParseSimulate.offer("_EOF_");
assertThat(_in.read(),equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
assertTrue(_in.consumeAll());
assertThat(_in.getContentConsumed(),equalTo(8L));
assertThat(_history.poll(),equalTo("Content succeeded AB")); Assert.assertTrue(_in.consumeAll());
assertThat(_history.poll(),equalTo("Content succeeded CD")); Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L));
assertThat(_history.poll(),equalTo("produceContent 3"));
assertThat(_history.poll(),equalTo("Content succeeded EF")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded GH")); Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue()); Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 3"));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF"));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
} }

View File

@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName; import javax.net.ssl.SNIServerName;
@ -55,7 +56,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SocketCustomizationListener; import org.eclipse.jetty.server.SocketCustomizationListener;
import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -367,7 +367,7 @@ public class SniSslConnectionFactoryTest
@Test @Test
public void testSocketCustomization() throws Exception public void testSocketCustomization() throws Exception
{ {
final Queue<String> history = new ConcurrentArrayQueue<>(); final Queue<String> history = new LinkedBlockingQueue<>();
_connector.addBean(new SocketCustomizationListener() _connector.addBean(new SocketCustomizationListener()
{ {

View File

@ -28,6 +28,7 @@ import java.security.cert.X509Certificate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName; import javax.net.ssl.SNIServerName;
@ -49,7 +50,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SocketCustomizationListener; import org.eclipse.jetty.server.SocketCustomizationListener;
import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -198,8 +198,8 @@ public class SslConnectionFactoryTest
@Test @Test
public void testSocketCustomization() throws Exception public void testSocketCustomization() throws Exception
{ {
final Queue<String> history = new ConcurrentArrayQueue<>(); final Queue<String> history = new LinkedBlockingQueue<>();
_connector.addBean(new SocketCustomizationListener() _connector.addBean(new SocketCustomizationListener()
{ {
@Override @Override

View File

@ -1,577 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
* to store elements.
* <p>
* This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
* but producing less garbage because arrays are used to store elements rather than nodes.
* </p>
* <p>
* The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
* (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
* </p>
*
* @param <T> the Array entry type
*/
public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
{
public static final int DEFAULT_BLOCK_SIZE = 512;
public static final Object REMOVED_ELEMENT = new Object()
{
@Override
public String toString()
{
return "X";
}
};
private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
private final int _blockSize;
public ConcurrentArrayQueue()
{
this(DEFAULT_BLOCK_SIZE);
}
public ConcurrentArrayQueue(int blockSize)
{
_blockSize = blockSize;
Block<T> block = newBlock();
_blocks.set(HEAD_OFFSET,block);
_blocks.set(TAIL_OFFSET,block);
}
public int getBlockSize()
{
return _blockSize;
}
protected Block<T> getHeadBlock()
{
return _blocks.get(HEAD_OFFSET);
}
protected Block<T> getTailBlock()
{
return _blocks.get(TAIL_OFFSET);
}
@Override
public boolean offer(T item)
{
item = Objects.requireNonNull(item);
final Block<T> initialTailBlock = getTailBlock();
Block<T> currentTailBlock = initialTailBlock;
int tail = currentTailBlock.tail();
while (true)
{
if (tail == getBlockSize())
{
Block<T> nextTailBlock = currentTailBlock.next();
if (nextTailBlock == null)
{
nextTailBlock = newBlock();
if (currentTailBlock.link(nextTailBlock))
{
// Linking succeeded, loop
currentTailBlock = nextTailBlock;
}
else
{
// Concurrent linking, use other block and loop
currentTailBlock = currentTailBlock.next();
}
}
else
{
// Not at last block, loop
currentTailBlock = nextTailBlock;
}
tail = currentTailBlock.tail();
}
else
{
if (currentTailBlock.peek(tail) == null)
{
if (currentTailBlock.store(tail, item))
{
// Item stored
break;
}
else
{
// Concurrent store, try next index
++tail;
}
}
else
{
// Not free, try next index
++tail;
}
}
}
updateTailBlock(initialTailBlock, currentTailBlock);
return true;
}
private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
{
// Update the tail block pointer if needs to
if (oldTailBlock != newTailBlock)
{
// The tail block pointer is allowed to lag behind.
// If this update fails, it means that other threads
// have filled this block and installed a new one.
casTailBlock(oldTailBlock, newTailBlock);
}
}
protected boolean casTailBlock(Block<T> current, Block<T> update)
{
return _blocks.compareAndSet(TAIL_OFFSET,current,update);
}
@SuppressWarnings("unchecked")
@Override
public T poll()
{
final Block<T> initialHeadBlock = getHeadBlock();
Block<T> currentHeadBlock = initialHeadBlock;
int head = currentHeadBlock.head();
T result = null;
while (true)
{
if (head == getBlockSize())
{
Block<T> nextHeadBlock = currentHeadBlock.next();
if (nextHeadBlock == null)
{
// We could have read that the next head block was null
// but another thread allocated a new block and stored a
// new item. This thread could not detect this, but that
// is ok, otherwise we would not be able to exit this loop.
// Queue is empty
break;
}
else
{
// Use next block and loop
currentHeadBlock = nextHeadBlock;
head = currentHeadBlock.head();
}
}
else
{
Object element = currentHeadBlock.peek(head);
if (element == REMOVED_ELEMENT)
{
// Already removed, try next index
++head;
}
else
{
result = (T)element;
if (result != null)
{
if (currentHeadBlock.remove(head, result, true))
{
// Item removed
break;
}
else
{
// Concurrent remove, try next index
++head;
}
}
else
{
// Queue is empty
break;
}
}
}
}
updateHeadBlock(initialHeadBlock, currentHeadBlock);
return result;
}
private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
{
// Update the head block pointer if needs to
if (oldHeadBlock != newHeadBlock)
{
// The head block pointer lagged behind.
// If this update fails, it means that other threads
// have emptied this block and pointed to a new one.
casHeadBlock(oldHeadBlock, newHeadBlock);
}
}
protected boolean casHeadBlock(Block<T> current, Block<T> update)
{
return _blocks.compareAndSet(HEAD_OFFSET,current,update);
}
@Override
public T peek()
{
Block<T> currentHeadBlock = getHeadBlock();
int head = currentHeadBlock.head();
while (true)
{
if (head == getBlockSize())
{
Block<T> nextHeadBlock = currentHeadBlock.next();
if (nextHeadBlock == null)
{
// Queue is empty
return null;
}
else
{
// Use next block and loop
currentHeadBlock = nextHeadBlock;
head = currentHeadBlock.head();
}
}
else
{
T element = currentHeadBlock.peek(head);
if (element == REMOVED_ELEMENT)
{
// Already removed, try next index
++head;
}
else
{
return element;
}
}
}
}
@Override
public boolean remove(Object o)
{
Block<T> currentHeadBlock = getHeadBlock();
int head = currentHeadBlock.head();
boolean result = false;
while (true)
{
if (head == getBlockSize())
{
Block<T> nextHeadBlock = currentHeadBlock.next();
if (nextHeadBlock == null)
{
// Not found
break;
}
else
{
// Use next block and loop
currentHeadBlock = nextHeadBlock;
head = currentHeadBlock.head();
}
}
else
{
Object element = currentHeadBlock.peek(head);
if (element == REMOVED_ELEMENT)
{
// Removed, try next index
++head;
}
else
{
if (element == null)
{
// Not found
break;
}
else
{
if (element.equals(o))
{
// Found
if (currentHeadBlock.remove(head, o, false))
{
result = true;
break;
}
else
{
++head;
}
}
else
{
// Not the one we're looking for
++head;
}
}
}
}
}
return result;
}
@Override
public boolean removeAll(Collection<?> c)
{
// TODO: super invocations are based on iterator.remove(), which throws
return super.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c)
{
// TODO: super invocations are based on iterator.remove(), which throws
return super.retainAll(c);
}
@Override
public Iterator<T> iterator()
{
final List<Object[]> blocks = new ArrayList<>();
Block<T> currentHeadBlock = getHeadBlock();
while (currentHeadBlock != null)
{
Object[] elements = currentHeadBlock.arrayCopy();
blocks.add(elements);
currentHeadBlock = currentHeadBlock.next();
}
return new Iterator<T>()
{
private int blockIndex;
private int index;
@Override
public boolean hasNext()
{
while (true)
{
if (blockIndex == blocks.size())
return false;
Object element = blocks.get(blockIndex)[index];
if (element == null)
return false;
if (element != REMOVED_ELEMENT)
return true;
advance();
}
}
@Override
public T next()
{
while (true)
{
if (blockIndex == blocks.size())
throw new NoSuchElementException();
Object element = blocks.get(blockIndex)[index];
if (element == null)
throw new NoSuchElementException();
advance();
if (element != REMOVED_ELEMENT) {
@SuppressWarnings("unchecked")
T e = (T)element;
return e;
}
}
}
private void advance()
{
if (++index == getBlockSize())
{
index = 0;
++blockIndex;
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
@Override
public int size()
{
Block<T> currentHeadBlock = getHeadBlock();
int head = currentHeadBlock.head();
int size = 0;
while (true)
{
if (head == getBlockSize())
{
Block<T> nextHeadBlock = currentHeadBlock.next();
if (nextHeadBlock == null)
{
break;
}
else
{
// Use next block and loop
currentHeadBlock = nextHeadBlock;
head = currentHeadBlock.head();
}
}
else
{
Object element = currentHeadBlock.peek(head);
if (element == REMOVED_ELEMENT)
{
// Already removed, try next index
++head;
}
else if (element != null)
{
++size;
++head;
}
else
{
break;
}
}
}
return size;
}
protected Block<T> newBlock()
{
return new Block<>(getBlockSize());
}
protected int getBlockCount()
{
int result = 0;
Block<T> headBlock = getHeadBlock();
while (headBlock != null)
{
++result;
headBlock = headBlock.next();
}
return result;
}
protected static final class Block<E>
{
private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
private final AtomicReferenceArray<Object> elements;
private final AtomicReference<Block<E>> next = new AtomicReference<>();
private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
protected Block(int blockSize)
{
elements = new AtomicReferenceArray<>(blockSize);
}
@SuppressWarnings("unchecked")
public E peek(int index)
{
return (E)elements.get(index);
}
public boolean store(int index, E item)
{
boolean result = elements.compareAndSet(index, null, item);
if (result)
indexes.incrementAndGet(tailOffset);
return result;
}
public boolean remove(int index, Object item, boolean updateHead)
{
boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
if (result && updateHead)
indexes.incrementAndGet(headOffset);
return result;
}
public Block<E> next()
{
return next.get();
}
public boolean link(Block<E> nextBlock)
{
return next.compareAndSet(null, nextBlock);
}
public int head()
{
return indexes.get(headOffset);
}
public int tail()
{
return indexes.get(tailOffset);
}
public Object[] arrayCopy()
{
Object[] result = new Object[elements.length()];
for (int i = 0; i < result.length; ++i)
result[i] = elements.get(i);
return result;
}
}
}

View File

@ -1,172 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.junit.Assert;
import org.junit.Test;
public class ConcurrentArrayQueueTest
{
protected ConcurrentArrayQueue<Integer> newConcurrentArrayQueue(int blockSize)
{
return new ConcurrentArrayQueue<>(blockSize);
}
@Test
public void testOfferCreatesBlock()
{
int blockSize = 2;
ConcurrentArrayQueue<Integer> queue = newConcurrentArrayQueue(blockSize);
int blocks = 3;
for (int i = 0; i < blocks * blockSize + 1; ++i)
queue.offer(i);
Assert.assertEquals(blocks + 1, queue.getBlockCount());
}
@Test
public void testPeekRemove() throws Exception
{
int blockSize = 2;
ConcurrentArrayQueue<Integer> queue = newConcurrentArrayQueue(blockSize);
Assert.assertNull(queue.peek());
queue.offer(1);
queue.remove(1);
Assert.assertNull(queue.peek());
int blocks = 3;
int size = blocks * blockSize + 1;
for (int i = 0; i < size; ++i)
queue.offer(i);
for (int i = 0; i < size; ++i)
{
Assert.assertEquals(i, (int)queue.peek());
Assert.assertEquals(i, (int)queue.remove());
}
}
@Test
public void testRemoveObject() throws Exception
{
int blockSize = 2;
ConcurrentArrayQueue<Integer> queue = newConcurrentArrayQueue(blockSize);
queue.add(1);
queue.add(2);
queue.add(3);
Assert.assertFalse(queue.remove(4));
int size = queue.size();
Assert.assertTrue(queue.remove(2));
--size;
Assert.assertEquals(size, queue.size());
Iterator<Integer> iterator = queue.iterator();
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(1, (int)iterator.next());
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(3, (int)iterator.next());
queue.offer(4);
++size;
Assert.assertTrue(queue.remove(3));
--size;
Assert.assertEquals(size, queue.size());
iterator = queue.iterator();
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(1, (int)iterator.next());
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(4, (int)iterator.next());
Assert.assertTrue(queue.remove(1));
--size;
Assert.assertTrue(queue.remove(4));
--size;
iterator = queue.iterator();
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testSize() throws Exception
{
int blockSize = 2;
ConcurrentArrayQueue<Integer> queue = newConcurrentArrayQueue(blockSize);
queue.offer(1);
Assert.assertEquals(1, queue.size());
queue = newConcurrentArrayQueue(blockSize);
for (int i = 0; i < 2 * blockSize; ++i)
queue.offer(i);
for (int i = 0; i < blockSize; ++i)
queue.poll();
Assert.assertEquals(blockSize, queue.size());
}
@Test
public void testIterator() throws Exception
{
int blockSize = 2;
ConcurrentArrayQueue<Integer> queue = newConcurrentArrayQueue(blockSize);
queue.offer(1);
Iterator<Integer> iterator = queue.iterator();
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(1, (int)iterator.next());
Assert.assertFalse(iterator.hasNext());
try
{
iterator.next();
Assert.fail();
}
catch (NoSuchElementException ignored)
{
}
// Test block edge
queue = newConcurrentArrayQueue(blockSize);
for (int i = 0; i < blockSize * 2; ++i)
queue.offer(i);
queue.poll();
iterator = queue.iterator();
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(1, (int)iterator.next());
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(2, (int)iterator.next());
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(3, (int)iterator.next());
Assert.assertFalse(iterator.hasNext());
try
{
iterator.next();
Assert.fail();
}
catch (NoSuchElementException ignored)
{
}
}
}

View File

@ -40,20 +40,8 @@ import org.junit.runner.RunWith;
public class QueueBenchmarkTest public class QueueBenchmarkTest
{ {
private static final Logger logger = Log.getLogger(QueueBenchmarkTest.class); private static final Logger logger = Log.getLogger(QueueBenchmarkTest.class);
private static final Runnable ELEMENT = new Runnable() private static final Runnable ELEMENT = () -> {};
{ private static final Runnable END = () -> {};
@Override
public void run()
{
}
};
private static final Runnable END = new Runnable()
{
@Override
public void run()
{
}
};
@Stress("High CPU") @Stress("High CPU")
@Test @Test
@ -67,10 +55,9 @@ public class QueueBenchmarkTest
final int iterations = 16 * 1024 * 1024; final int iterations = 16 * 1024 * 1024;
final List<Queue<Runnable>> queues = new ArrayList<>(); final List<Queue<Runnable>> queues = new ArrayList<>();
queues.add(new ConcurrentArrayQueue<Runnable>()); // Jetty lock-free queue, allocating array blocks queues.add(new ConcurrentLinkedQueue<>()); // JDK lock-free queue, allocating nodes
queues.add(new ConcurrentLinkedQueue<Runnable>()); // JDK lock-free queue, allocating nodes queues.add(new ArrayBlockingQueue<>(iterations * writers)); // JDK lock-based, circular array queue
queues.add(new ArrayBlockingQueue<Runnable>(iterations * writers)); // JDK lock-based, circular array queue queues.add(new BlockingArrayQueue<>(iterations * writers)); // Jetty lock-based, circular array queue
queues.add(new BlockingArrayQueue<Runnable>(iterations * writers)); // Jetty lock-based, circular array queue
testQueues(readers, writers, iterations, queues, false); testQueues(readers, writers, iterations, queues, false);
} }
@ -87,9 +74,9 @@ public class QueueBenchmarkTest
final int iterations = 16 * 1024 * 1024; final int iterations = 16 * 1024 * 1024;
final List<Queue<Runnable>> queues = new ArrayList<>(); final List<Queue<Runnable>> queues = new ArrayList<>();
queues.add(new LinkedBlockingQueue<Runnable>()); queues.add(new LinkedBlockingQueue<>());
queues.add(new ArrayBlockingQueue<Runnable>(iterations * writers)); queues.add(new ArrayBlockingQueue<>(iterations * writers));
queues.add(new BlockingArrayQueue<Runnable>(iterations * writers)); queues.add(new BlockingArrayQueue<>(iterations * writers));
testQueues(readers, writers, iterations, queues, true); testQueues(readers, writers, iterations, queues, true);
} }

View File

@ -23,9 +23,8 @@ import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer; import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
@ -37,8 +36,8 @@ public class ExecuteProduceConsumeTest
{ {
private static final Runnable NULLTASK = () -> {}; private static final Runnable NULLTASK = () -> {};
private final BlockingQueue<Runnable> _produce = new BlockingArrayQueue<>(); private final BlockingQueue<Runnable> _produce = new LinkedBlockingQueue<>();
private final Queue<Runnable> _executions = new ConcurrentArrayQueue<>(); private final Queue<Runnable> _executions = new LinkedBlockingQueue<>();
private ExecuteProduceConsume _ewyk; private ExecuteProduceConsume _ewyk;
private volatile Thread _producer; private volatile Thread _producer;

View File

@ -19,12 +19,12 @@
package org.eclipse.jetty.websocket.common.extensions; package org.eclipse.jetty.websocket.common.extensions;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
@ -52,7 +52,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
{ {
private static final Logger LOG = Log.getLogger(ExtensionStack.class); private static final Logger LOG = Log.getLogger(ExtensionStack.class);
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private final ExtensionFactory factory; private final ExtensionFactory factory;
private List<Extension> extensions; private List<Extension> extensions;
@ -292,7 +292,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
FrameEntry entry = new FrameEntry(frame,callback,batchMode); FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}",entry); LOG.debug("Queuing {}",entry);
entries.offer(entry); offerEntry(entry);
flusher.iterate(); flusher.iterate();
} }
@ -317,12 +317,36 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
} }
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private int getQueueSize()
{
synchronized (this)
{
return entries.size();
}
}
@Override @Override
public String toString() public String toString()
{ {
StringBuilder s = new StringBuilder(); StringBuilder s = new StringBuilder();
s.append("ExtensionStack["); s.append("ExtensionStack[");
s.append("queueSize=").append(entries.size()); s.append("queueSize=").append(getQueueSize());
s.append(",extensions="); s.append(",extensions=");
if (extensions == null) if (extensions == null)
{ {
@ -383,7 +407,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
current = entries.poll(); current = pollEntry();
if (current == null) if (current == null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions.compress;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
@ -28,7 +29,6 @@ import java.util.zip.Inflater;
import java.util.zip.ZipException; import java.util.zip.ZipException;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -72,7 +72,7 @@ public abstract class CompressExtension extends AbstractExtension
private final static boolean NOWRAP = true; private final static boolean NOWRAP = true;
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private Deflater deflaterImpl; private Deflater deflaterImpl;
private Inflater inflaterImpl; private Inflater inflaterImpl;
@ -171,7 +171,7 @@ public abstract class CompressExtension extends AbstractExtension
return; return;
} }
int read = 0; int read;
while ((read = inflater.inflate(output)) >= 0) while ((read = inflater.inflate(output)) >= 0)
{ {
if (read == 0) if (read == 0)
@ -214,10 +214,26 @@ public abstract class CompressExtension extends AbstractExtension
FrameEntry entry = new FrameEntry(frame,callback,batchMode); FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}",entry); LOG.debug("Queuing {}",entry);
entries.offer(entry); offerEntry(entry);
flusher.iterate(); flusher.iterate();
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
protected void notifyCallbackSuccess(WriteCallback callback) protected void notifyCallbackSuccess(WriteCallback callback)
{ {
try try
@ -258,7 +274,7 @@ public abstract class CompressExtension extends AbstractExtension
} }
byte input[]; byte input[];
int inputOffset = 0; int inputOffset;
int len; int len;
if (buf.hasArray()) if (buf.hasArray())
@ -298,7 +314,7 @@ public abstract class CompressExtension extends AbstractExtension
} }
byte input[]; byte input[];
int inputOffset = 0; int inputOffset;
int len; int len;
if (buf.hasArray()) if (buf.hasArray())
@ -408,7 +424,7 @@ public abstract class CompressExtension extends AbstractExtension
{ {
if (finished) if (finished)
{ {
current = entries.poll(); current = pollEntry();
LOG.debug("Processing {}",current); LOG.debug("Processing {}",current);
if (current == null) if (current == null)
return Action.IDLE; return Action.IDLE;
@ -545,7 +561,7 @@ public abstract class CompressExtension extends AbstractExtension
{ {
// Fail all the frames in the queue. // Fail all the frames in the queue.
FrameEntry entry; FrameEntry entry;
while ((entry = entries.poll()) != null) while ((entry = pollEntry()) != null)
notifyCallbackFailure(entry.callback,x); notifyCallbackFailure(entry.callback,x);
} }

View File

@ -20,9 +20,9 @@ package org.eclipse.jetty.websocket.common.extensions.fragment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -41,7 +41,7 @@ public class FragmentExtension extends AbstractExtension
{ {
private static final Logger LOG = Log.getLogger(FragmentExtension.class); private static final Logger LOG = Log.getLogger(FragmentExtension.class);
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private int maxLength; private int maxLength;
@ -71,7 +71,7 @@ public class FragmentExtension extends AbstractExtension
FrameEntry entry = new FrameEntry(frame, callback, batchMode); FrameEntry entry = new FrameEntry(frame, callback, batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}", entry); LOG.debug("Queuing {}", entry);
entries.offer(entry); offerEntry(entry);
flusher.iterate(); flusher.iterate();
} }
@ -82,6 +82,22 @@ public class FragmentExtension extends AbstractExtension
maxLength = config.getParameter("maxLength", -1); maxLength = config.getParameter("maxLength", -1);
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private static class FrameEntry private static class FrameEntry
{ {
private final Frame frame; private final Frame frame;
@ -112,7 +128,7 @@ public class FragmentExtension extends AbstractExtension
{ {
if (finished) if (finished)
{ {
current = entries.poll(); current = pollEntry();
LOG.debug("Processing {}", current); LOG.debug("Processing {}", current);
if (current == null) if (current == null)
return Action.IDLE; return Action.IDLE;