merged unsafe blocking Q, removed unsafe usage and usage of Q

This commit is contained in:
Greg Wilkins 2013-03-18 13:27:37 +11:00
parent ba9d70589a
commit 5f1980f408
6 changed files with 35 additions and 142 deletions

View File

@ -35,6 +35,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -317,7 +318,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
// TODO private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
private final int _id;
private Selector _selector;
private volatile Thread _thread;

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.server;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ConcurrentArrayBlockingQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -59,6 +60,7 @@ public class AsyncNCSARequestLog extends NCSARequestLog
super(filename);
if (queue==null)
queue=new BlockingArrayQueue<String>(1024);
// TODO queue=new ConcurrentArrayBlockingQueue.Bounded<String>(1024);
_queue=queue;
}

View File

@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -162,12 +163,10 @@ public abstract class ConcurrentArrayBlockingQueue<E> extends ConcurrentArrayQue
*/
public static class Unbounded<E> extends ConcurrentArrayBlockingQueue<E>
{
private static final int HEAD_OFFSET = MemoryUtils.getLongsPerCacheLine() - 1;
private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getLongsPerCacheLine();
private static final long SIZE_LEFT_OFFSET = MemoryUtils.arrayElementOffset(long[].class, HEAD_OFFSET);
private static final long SIZE_RIGHT_OFFSET = MemoryUtils.arrayElementOffset(long[].class, TAIL_OFFSET);
private final long[] _sizes = new long[TAIL_OFFSET + 1];
private static final int SIZE_LEFT_OFFSET = MemoryUtils.getLongsPerCacheLine() - 1;
private static final int SIZE_RIGHT_OFFSET = SIZE_LEFT_OFFSET + MemoryUtils.getLongsPerCacheLine();
private final AtomicLongArray _sizes = new AtomicLongArray(SIZE_RIGHT_OFFSET+1);
public Unbounded()
{
@ -190,24 +189,24 @@ public abstract class ConcurrentArrayBlockingQueue<E> extends ConcurrentArrayQue
private int getAndIncrementSize()
{
long sizeLeft = MemoryUtils.volatileGetLong(_sizes, SIZE_LEFT_OFFSET);
long sizeRight = MemoryUtils.getAndIncrementLong(_sizes, SIZE_RIGHT_OFFSET);
long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET);
long sizeRight = _sizes.getAndIncrement(SIZE_RIGHT_OFFSET);
return (int)(sizeRight - sizeLeft);
}
@Override
protected int decrementAndGetSize()
{
long sizeLeft = MemoryUtils.incrementAndGetLong(_sizes, SIZE_LEFT_OFFSET);
long sizeRight = MemoryUtils.volatileGetLong(_sizes, SIZE_RIGHT_OFFSET);
long sizeLeft = _sizes.incrementAndGet(SIZE_LEFT_OFFSET);
long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET);
return (int)(sizeRight - sizeLeft);
}
@Override
public int size()
{
long sizeLeft = MemoryUtils.volatileGetLong(_sizes, SIZE_LEFT_OFFSET);
long sizeRight = MemoryUtils.volatileGetLong(_sizes, SIZE_RIGHT_OFFSET);
long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET);
long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET);
return (int)(sizeRight - sizeLeft);
}

View File

@ -27,6 +27,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -55,11 +56,9 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
};
private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
private static final long HEAD_BLOCK_OFFSET = MemoryUtils.arrayElementOffset(Block[].class, HEAD_OFFSET);
private static final long TAIL_BLOCK_OFFSET = MemoryUtils.arrayElementOffset(Block[].class, TAIL_OFFSET);
private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
private final Block[] _blocks = new Block[TAIL_OFFSET + 1];
private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
private final int _blockSize;
public ConcurrentArrayQueue()
@ -71,8 +70,8 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
{
_blockSize = blockSize;
Block<T> block = newBlock();
MemoryUtils.volatilePutObject(_blocks, HEAD_BLOCK_OFFSET, block);
MemoryUtils.volatilePutObject(_blocks, TAIL_BLOCK_OFFSET, block);
_blocks.set(HEAD_OFFSET,block);
_blocks.set(TAIL_OFFSET,block);
}
public int getBlockSize()
@ -82,12 +81,12 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
protected Block<T> getHeadBlock()
{
return MemoryUtils.volatileGetObject(_blocks, HEAD_BLOCK_OFFSET);
return _blocks.get(HEAD_OFFSET);
}
protected Block<T> getTailBlock()
{
return MemoryUtils.volatileGetObject(_blocks, TAIL_BLOCK_OFFSET);
return _blocks.get(TAIL_OFFSET);
}
@Override
@ -166,7 +165,7 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
protected boolean casTailBlock(Block<T> current, Block<T> update)
{
return MemoryUtils.compareAndSetObject(_blocks, TAIL_BLOCK_OFFSET, current, update);
return _blocks.compareAndSet(TAIL_OFFSET,current,update);
}
@Override
@ -250,7 +249,7 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
protected boolean casHeadBlock(Block<T> current, Block<T> update)
{
return MemoryUtils.compareAndSetObject(_blocks, HEAD_BLOCK_OFFSET, current, update);
return _blocks.compareAndSet(HEAD_OFFSET,current,update);
}
@Override
@ -507,12 +506,12 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
protected static final class Block<E>
{
private static final long headOffset = MemoryUtils.arrayElementOffset(int[].class, HEAD_OFFSET);
private static final long tailOffset = MemoryUtils.arrayElementOffset(int[].class, TAIL_OFFSET);
private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
private final AtomicReferenceArray<Object> elements;
private final AtomicReference<Block<E>> next = new AtomicReference<>();
private final int[] indexes = new int[TAIL_OFFSET + 1];
private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
protected Block(int blockSize)
{
@ -528,7 +527,7 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
{
boolean result = elements.compareAndSet(index, null, item);
if (result)
MemoryUtils.incrementAndGetInt(indexes, tailOffset);
indexes.incrementAndGet(tailOffset);
return result;
}
@ -536,7 +535,7 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
{
boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
if (result && updateHead)
MemoryUtils.incrementAndGetInt(indexes, headOffset);
indexes.incrementAndGet(headOffset);
return result;
}
@ -552,12 +551,12 @@ public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
public int head()
{
return MemoryUtils.volatileGetInt(indexes, headOffset);
return indexes.get(headOffset);
}
public int tail()
{
return MemoryUtils.volatileGetInt(indexes, tailOffset);
return indexes.get(tailOffset);
}
public Object[] arrayCopy()

View File

@ -18,12 +18,8 @@
package org.eclipse.jetty.util;
import java.lang.reflect.Field;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import sun.misc.Unsafe;
/**
* {@link MemoryUtils} provides an abstraction over memory properties and operations.
@ -53,28 +49,6 @@ public class MemoryUtils
cacheLineBytes = value;
}
private static final Unsafe unsafe;
static
{
try
{
unsafe = AccessController.doPrivileged(new PrivilegedExceptionAction<Unsafe>()
{
@Override
public Unsafe run() throws Exception
{
Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
return (Unsafe)unsafeField.get(null);
}
});
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
private MemoryUtils()
{
}
@ -94,89 +68,4 @@ public class MemoryUtils
return getCacheLineBytes() >> 3;
}
public static long arrayElementOffset(Class<?> arrayClass, int elementOffset)
{
long base = unsafe.arrayBaseOffset(arrayClass);
long scale = unsafe.arrayIndexScale(arrayClass);
return base + scale * elementOffset;
}
public static int volatileGetInt(Object array, long offset)
{
return unsafe.getIntVolatile(array, offset);
}
public static long volatileGetLong(Object array, long offset)
{
return unsafe.getLongVolatile(array, offset);
}
public static int getAndIncrementInt(Object array, long offset)
{
while (true)
{
int current = volatileGetInt(array, offset);
int next = current + 1;
if (compareAndSetInt(array, offset, current, next))
return current;
}
}
public static long getAndIncrementLong(Object array, long offset)
{
while (true)
{
long current = volatileGetLong(array, offset);
long next = current + 1;
if (compareAndSetLong(array, offset, current, next))
return current;
}
}
public static int incrementAndGetInt(Object array, long offset)
{
while (true)
{
int current = volatileGetInt(array, offset);
int next = current + 1;
if (compareAndSetInt(array, offset, current, next))
return next;
}
}
public static long incrementAndGetLong(Object array, long offset)
{
while (true)
{
long current = volatileGetLong(array, offset);
long next = current + 1;
if (compareAndSetLong(array, offset, current, next))
return next;
}
}
public static <R> R volatileGetObject(Object array, long offset)
{
return (R)unsafe.getObjectVolatile(array, offset);
}
public static void volatilePutObject(Object array, long offset, Object element)
{
unsafe.putOrderedObject(array, offset, element);
}
public static boolean compareAndSetObject(Object array, long offset, Object expected, Object value)
{
return unsafe.compareAndSwapObject(array, offset, expected, value);
}
public static boolean compareAndSetInt(Object array, long offset, int expected, int value)
{
return unsafe.compareAndSwapInt(array, offset, expected, value);
}
public static boolean compareAndSetLong(Object array, long offset, long expected, long value)
{
return unsafe.compareAndSwapLong(array, offset, expected, value);
}
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ConcurrentArrayBlockingQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -91,7 +92,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
setStopTimeout(5000);
if (queue==null)
queue=new BlockingArrayQueue<Runnable>(_minThreads, _minThreads);
queue=new BlockingArrayQueue<>(_minThreads,_minThreads);
// TODO queue=new ConcurrentArrayBlockingQueue.Unbounded<Runnable>();
_jobs=queue;
}