From 4da6ddc94f988f668a652314c46c6da5a06decfe Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 17 Oct 2013 09:14:48 +0200 Subject: [PATCH] 403591 - improve the Blocking Q implementation. Removed ConcurrentArrayBlockingQueue implementations as they were not up to par performance wise. --- .../util/ConcurrentArrayBlockingQueue.java | 418 ------------------ .../jetty/util/thread/QueuedThreadPool.java | 4 +- ...urrentArrayBlockingQueueUnboundedTest.java | 166 ------- .../jetty/util/QueueBenchmarkTest.java | 2 - 4 files changed, 2 insertions(+), 588 deletions(-) delete mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueue.java delete mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueueUnboundedTest.java diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueue.java b/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueue.java deleted file mode 100644 index ca9a109eab1..00000000000 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueue.java +++ /dev/null @@ -1,418 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.Collection; -import java.util.Objects; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLongArray; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Common functionality for a blocking version of {@link ConcurrentArrayQueue}. - * - * @see Unbounded - * @see Bounded - * @param - */ -public abstract class ConcurrentArrayBlockingQueue extends ConcurrentArrayQueue implements BlockingQueue -{ - private final Lock _lock = new ReentrantLock(); - private final Condition _consumer = _lock.newCondition(); - - public ConcurrentArrayBlockingQueue(int blockSize) - { - super(blockSize); - } - - @Override - public E poll() - { - E result = super.poll(); - if (result != null && decrementAndGetSize() > 0) - signalConsumer(); - return result; - } - - @Override - public boolean remove(Object o) - { - boolean result = super.remove(o); - if (result && decrementAndGetSize() > 0) - signalConsumer(); - return result; - } - - protected abstract int decrementAndGetSize(); - - protected void signalConsumer() - { - final Lock lock = _lock; - lock.lock(); - try - { - _consumer.signal(); - } - finally - { - lock.unlock(); - } - } - - @Override - public E take() throws InterruptedException - { - while (true) - { - E result = poll(); - if (result != null) - return result; - - final Lock lock = _lock; - lock.lockInterruptibly(); - try - { - if (size() == 0) - { - _consumer.await(); - } - } - finally - { - lock.unlock(); - } - } - } - - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException - { - long nanos = unit.toNanos(timeout); - - while (true) - { - // TODO should reduce nanos if we spin here - - E result = poll(); - if (result != null) - return result; - - final Lock lock = _lock; - lock.lockInterruptibly(); - try - { - if (size() == 0) - { - if (nanos <= 0) - return null; - nanos = _consumer.awaitNanos(nanos); - } - } - finally - { - lock.unlock(); - } - } - } - - @Override - public int drainTo(Collection c) - { - return drainTo(c, Integer.MAX_VALUE); - } - - @Override - public int drainTo(Collection c, int maxElements) - { - if (c == this) - throw new IllegalArgumentException(); - - int added = 0; - while (added < maxElements) - { - E element = poll(); - if (element == null) - break; - c.add(element); - ++added; - } - return added; - } - - /** - * An unbounded, blocking version of {@link ConcurrentArrayQueue}. - * - * @param - */ - public static class Unbounded extends ConcurrentArrayBlockingQueue - { - private static final int SIZE_LEFT_OFFSET = MemoryUtils.getLongsPerCacheLine() - 1; - private static final int SIZE_RIGHT_OFFSET = SIZE_LEFT_OFFSET + MemoryUtils.getLongsPerCacheLine(); - - private final AtomicLongArray _sizes = new AtomicLongArray(SIZE_RIGHT_OFFSET+1); - - public Unbounded() - { - this(DEFAULT_BLOCK_SIZE); - } - - public Unbounded(int blockSize) - { - super(blockSize); - } - - @Override - public boolean offer(E item) - { - boolean result = super.offer(item); - if (result && getAndIncrementSize() == 0) - signalConsumer(); - return result; - } - - private int getAndIncrementSize() - { - long sizeRight = _sizes.getAndIncrement(SIZE_RIGHT_OFFSET); - long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET); - return (int)(sizeRight - sizeLeft); - } - - @Override - protected int decrementAndGetSize() - { - long sizeLeft = _sizes.incrementAndGet(SIZE_LEFT_OFFSET); - long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET); - return (int)(sizeRight - sizeLeft); - } - - @Override - public int size() - { - long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET); - long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET); - return (int)(sizeRight - sizeLeft); - } - - @Override - public int remainingCapacity() - { - return Integer.MAX_VALUE; - } - - @Override - public void put(E element) throws InterruptedException - { - offer(element); - } - - @Override - public boolean offer(E element, long timeout, TimeUnit unit) throws InterruptedException - { - return offer(element); - } - } - - /** - * A bounded, blocking version of {@link ConcurrentArrayQueue}. - * - * @param - */ - public static class Bounded extends ConcurrentArrayBlockingQueue - { - private final AtomicInteger _size = new AtomicInteger(); - private final Lock _lock = new ReentrantLock(); - private final Condition _producer = _lock.newCondition(); - private final int _capacity; - - public Bounded(int capacity) - { - this(DEFAULT_BLOCK_SIZE, capacity); - } - - public Bounded(int blockSize, int capacity) - { - super(blockSize); - this._capacity = capacity; - } - - @Override - public boolean offer(E item) - { - while (true) - { - int size = size(); - int nextSize = size + 1; - - if (nextSize > _capacity) - return false; - - if (_size.compareAndSet(size, nextSize)) - { - if (super.offer(item)) - { - if (size == 0) - signalConsumer(); - return true; - } - else - { - decrementAndGetSize(); - } - } - } - } - - @Override - public E poll() - { - E result = super.poll(); - if (result != null) - signalProducer(); - return result; - } - - @Override - public boolean remove(Object o) - { - boolean result = super.remove(o); - if (result) - signalProducer(); - return result; - } - - @Override - protected int decrementAndGetSize() - { - return _size.decrementAndGet(); - } - - @Override - public int size() - { - return _size.get(); - } - - @Override - public int remainingCapacity() - { - return _capacity - size(); - } - - @Override - public void put(E item) throws InterruptedException - { - item = Objects.requireNonNull(item); - - while (true) - { - final Lock lock = _lock; - lock.lockInterruptibly(); - try - { - if (size() == _capacity) - _producer.await(); - } - finally - { - lock.unlock(); - } - if (offer(item)) - break; - } - } - - @Override - public boolean offer(E item, long timeout, TimeUnit unit) throws InterruptedException - { - item = Objects.requireNonNull(item); - - long nanos = unit.toNanos(timeout); - while (true) - { - final Lock lock = _lock; - lock.lockInterruptibly(); - try - { - if (size() == _capacity) - { - if (nanos <= 0) - return false; - nanos = _producer.awaitNanos(nanos); - } - } - finally - { - lock.unlock(); - } - if (offer(item)) - break; - } - - return true; - } - - @Override - public int drainTo(Collection c, int maxElements) - { - int result = super.drainTo(c, maxElements); - if (result > 0) - signalProducers(); - return result; - } - - @Override - public void clear() - { - super.clear(); - signalProducers(); - } - - private void signalProducer() - { - final Lock lock = _lock; - lock.lock(); - try - { - _producer.signal(); - } - finally - { - lock.unlock(); - } - } - - private void signalProducers() - { - final Lock lock = _lock; - lock.lock(); - try - { - _producer.signalAll(); - } - finally - { - lock.unlock(); - } - } - } -} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 7c8b5e665fd..c116739f49d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -242,7 +242,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @param name Name of the BoundedThreadPool to use when naming Threads. + * @param name Name of this thread pool to use when naming threads. */ public void setName(String name) { @@ -303,7 +303,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @return The name of the BoundedThreadPool. + * @return The name of the this thread pool */ @ManagedAttribute("name of the thread pool") public String getName() diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueueUnboundedTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueueUnboundedTest.java deleted file mode 100644 index 8545f91f057..00000000000 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentArrayBlockingQueueUnboundedTest.java +++ /dev/null @@ -1,166 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.toolchain.test.TestTracker; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; - -public class ConcurrentArrayBlockingQueueUnboundedTest extends ConcurrentArrayQueueTest -{ - @Rule - public final TestTracker tracker = new TestTracker(); - - @Override - protected ConcurrentArrayBlockingQueue newConcurrentArrayQueue(int blockSize) - { - return new ConcurrentArrayBlockingQueue.Unbounded<>(blockSize); - } - - @Test - public void testOfferTake() throws Exception - { - ConcurrentArrayBlockingQueue queue = newConcurrentArrayQueue(32); - Integer item = 1; - Assert.assertTrue(queue.offer(item)); - Integer result = queue.take(); - Assert.assertSame(item, result); - } - - @Test - public void testTimedPollOffer() throws Exception - { - final ConcurrentArrayBlockingQueue queue = newConcurrentArrayQueue(32); - - final long timeout = 1000; - final Integer item = 1; - new Thread() - { - @Override - public void run() - { - try - { - TimeUnit.MILLISECONDS.sleep(timeout); - queue.offer(item); - } - catch (InterruptedException x) - { - x.printStackTrace(); - } - } - }.start(); - - Integer result = queue.poll(2 * timeout, TimeUnit.MILLISECONDS); - Assert.assertNotNull(result); - } - - @Test - public void testConcurrentOfferTake() throws Exception - { - final ConcurrentArrayBlockingQueue queue = newConcurrentArrayQueue(512); - int readerCount = 16; - final int factor = 2; - int writerCount = readerCount * factor; - final int iterations = 4096; - for (int runs = 0; runs < 16; ++runs) - { - ExecutorService executor = Executors.newFixedThreadPool(readerCount + writerCount); - List> readers = new ArrayList<>(); - for (int i = 0; i < readerCount / 2; ++i) - { - final int reader = i; - readers.add(executor.submit(new Callable() - { - @Override - public Integer call() throws Exception - { - int sum = 0; - for (int j = 0; j < iterations * factor; ++j) - sum += queue.take(); - //System.err.println("Taking reader " + reader + " completed: " + sum); - return sum; - } - })); - readers.add(executor.submit(new Callable() - { - @Override - public Integer call() throws Exception - { - int sum = 0; - for (int j = 0; j < iterations * factor; ++j) - sum += queue.poll(5, TimeUnit.SECONDS); - //System.err.println("Polling Reader " + reader + " completed: " + sum); - return sum; - } - })); - } - for (int i = 0; i < writerCount; ++i) - { - final int writer = i; - executor.submit(new Callable() - { - @Override - public Object call() throws Exception - { - for (int j = 0; j < iterations; ++j) - queue.offer(1); - //System.err.println("Writer " + writer + " completed"); - return null; - } - }); - } - - int sum = 0; - for (Future result : readers) - sum += result.get(); - - Assert.assertEquals(writerCount * iterations, sum); - Assert.assertTrue(queue.isEmpty()); - } - } - - @Test - public void testDrain() throws Exception - { - final ConcurrentArrayBlockingQueue queue = newConcurrentArrayQueue(512); - List chunk1 = Arrays.asList(1, 2); - List chunk2 = Arrays.asList(3, 4, 5); - queue.addAll(chunk1); - queue.addAll(chunk2); - - List drainer1 = new ArrayList<>(); - queue.drainTo(drainer1, chunk1.size()); - List drainer2 = new ArrayList<>(); - queue.drainTo(drainer2, chunk2.size()); - - Assert.assertEquals(chunk1, drainer1); - Assert.assertEquals(chunk2, drainer2); - } -} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/QueueBenchmarkTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/QueueBenchmarkTest.java index 750c288f61b..046aff52f43 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/QueueBenchmarkTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/QueueBenchmarkTest.java @@ -87,8 +87,6 @@ public class QueueBenchmarkTest final int iterations = 16 * 1024 * 1024; final List> queues = new ArrayList<>(); - queues.add(new ConcurrentArrayBlockingQueue.Unbounded()); - queues.add(new ConcurrentArrayBlockingQueue.Bounded(iterations * writers)); queues.add(new LinkedBlockingQueue()); queues.add(new ArrayBlockingQueue(iterations * writers)); queues.add(new BlockingArrayQueue(iterations * writers));