Upgrade to latest jst166y and jsr166e

Embed the code now in our source, since jsr166e jar generation with 1.6 instead of 1.7 is complicated when doing it on its own as it relies on ThreadLocalRandom, and we have it in jsr166y
This commit is contained in:
Shay Banon 2012-06-10 00:42:54 +02:00
parent 43483b1237
commit 8f0bc799c6
25 changed files with 17102 additions and 16 deletions

16
pom.xml
View File

@ -100,20 +100,6 @@
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>es-jsr166y</artifactId>
<version>20120131</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>es-jsr166e</artifactId>
<version>20120131</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
@ -314,8 +300,6 @@
<includes>
<include>com.google.guava:guava</include>
<include>net.sf.trove4j:trove4j</include>
<include>org.elasticsearch:es-jsr166y</include>
<include>org.elasticsearch:es-jsr166e</include>
<include>org.mvel:mvel2</include>
<include>org.codehaus.jackson:jackson-core-asl</include>
<include>org.codehaus.jackson:jackson-smile</include>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,201 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import java.io.IOException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* One or more variables that together maintain an initially zero
* {@code double} sum. When updates (method {@link #add}) are
* contended across threads, the set of variables may grow dynamically
* to reduce contention. Method {@link #sum} (or, equivalently {@link
* #doubleValue}) returns the current total combined across the
* variables maintaining the sum.
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code hashCode} and {@code compareTo} because
* instances are expected to be mutated, and so are not useful as
* collection keys.
*
* <p><em>jsr166e note: This class is targeted to be placed in
* java.util.concurrent.atomic<em>
*
* @since 1.8
* @author Doug Lea
*/
public class DoubleAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* Update function. Note that we must use "long" for underlying
* representations, because there is no compareAndSet for double,
* due to the fact that the bitwise equals used in any CAS
* implementation is not the same as double-precision equals.
* However, we use CAS only to detect and alleviate contention,
* for which bitwise equals works best anyway. In principle, the
* long/double conversions used here should be essentially free on
* most platforms since they just re-interpret bits.
*
* Similar conversions are used in other methods.
*/
final long fn(long v, long x) {
return Double.doubleToRawLongBits
(Double.longBitsToDouble(v) +
Double.longBitsToDouble(x));
}
/**
* Creates a new adder with initial sum of zero.
*/
public DoubleAdder() {
}
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(double x) {
Cell[] as; long b, v; HashCode hc; Cell a; int n;
if ((as = cells) != null ||
!casBase(b = base,
Double.doubleToRawLongBits
(Double.longBitsToDouble(b) + x))) {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
(a = as[(n - 1) & h]) == null ||
!(uncontended = a.cas(v = a.value,
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x))))
retryUpdate(Double.doubleToRawLongBits(x), hc, uncontended);
}
}
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot: Invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated. Also, because double-precision arithmetic is not
* strictly associative, the returned result need not be identical
* to the value that would be obtained in a sequential series of
* updates to a single variable.
*
* @return the sum
*/
public double sum() {
Cell[] as = cells;
double sum = Double.longBitsToDouble(base);
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null)
sum += Double.longBitsToDouble(a.value);
}
}
return sum;
}
/**
* Resets variables maintaining the sum to zero. This method may
* be a useful alternative to creating a new adder, but is only
* effective if there are no concurrent updates. Because this
* method is intrinsically racy, it should only be used when it is
* known that no threads are concurrently updating.
*/
public void reset() {
internalReset(0L);
}
/**
* Equivalent in effect to {@link #sum} followed by {@link
* #reset}. This method may apply for example during quiescent
* points between multithreaded computations. If there are
* updates concurrent with this method, the returned value is
* <em>not</em> guaranteed to be the final value occurring before
* the reset.
*
* @return the sum
*/
public double sumThenReset() {
Cell[] as = cells;
double sum = Double.longBitsToDouble(base);
base = 0L;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null) {
long v = a.value;
a.value = 0L;
sum += Double.longBitsToDouble(v);
}
}
}
return sum;
}
/**
* Returns the String representation of the {@link #sum}.
* @return the String representation of the {@link #sum}
*/
public String toString() {
return Double.toString(sum());
}
/**
* Equivalent to {@link #sum}.
*
* @return the sum
*/
public double doubleValue() {
return sum();
}
/**
* Returns the {@link #sum} as a {@code long} after a
* narrowing primitive conversion.
*/
public long longValue() {
return (long)sum();
}
/**
* Returns the {@link #sum} as an {@code int} after a
* narrowing primitive conversion.
*/
public int intValue() {
return (int)sum();
}
/**
* Returns the {@link #sum} as a {@code float}
* after a narrowing primitive conversion.
*/
public float floatValue() {
return (float)sum();
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeDouble(sum());
}
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
busy = 0;
cells = null;
base = Double.doubleToRawLongBits(s.readDouble());
}
}

View File

@ -0,0 +1,196 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import java.io.IOException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* One or more variables that together maintain a running {@code double}
* maximum with initial value {@code Double.NEGATIVE_INFINITY}. When
* updates (method {@link #update}) are contended across threads, the
* set of variables may grow dynamically to reduce contention. Method
* {@link #max} (or, equivalently, {@link #doubleValue}) returns the
* current maximum across the variables maintaining updates.
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code hashCode} and {@code compareTo} because
* instances are expected to be mutated, and so are not useful as
* collection keys.
*
* <p><em>jsr166e note: This class is targeted to be placed in
* java.util.concurrent.atomic<em>
*
* @since 1.8
* @author Doug Lea
*/
public class DoubleMaxUpdater extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* Long representation of negative infinity. See class Double
* internal documentation for explanation.
*/
private static final long MIN_AS_LONG = 0xfff0000000000000L;
/**
* Update function. See class DoubleAdder for rationale
* for using conversions from/to long.
*/
final long fn(long v, long x) {
return Double.longBitsToDouble(v) > Double.longBitsToDouble(x) ? v : x;
}
/**
* Creates a new instance with initial value of {@code
* Double.NEGATIVE_INFINITY}.
*/
public DoubleMaxUpdater() {
base = MIN_AS_LONG;
}
/**
* Updates the maximum to be at least the given value.
*
* @param x the value to update
*/
public void update(double x) {
long lx = Double.doubleToRawLongBits(x);
Cell[] as; long b, v; HashCode hc; Cell a; int n;
if ((as = cells) != null ||
(Double.longBitsToDouble(b = base) < x && !casBase(b, lx))) {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
(a = as[(n - 1) & h]) == null ||
(Double.longBitsToDouble(v = a.value) < x &&
!(uncontended = a.cas(v, lx))))
retryUpdate(lx, hc, uncontended);
}
}
/**
* Returns the current maximum. The returned value is
* <em>NOT</em> an atomic snapshot: Invocation in the absence of
* concurrent updates returns an accurate result, but concurrent
* updates that occur while the value is being calculated might
* not be incorporated.
*
* @return the maximum
*/
public double max() {
Cell[] as = cells;
double max = Double.longBitsToDouble(base);
if (as != null) {
int n = as.length;
double v;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null && (v = Double.longBitsToDouble(a.value)) > max)
max = v;
}
}
return max;
}
/**
* Resets variables maintaining updates to {@code
* Double.NEGATIVE_INFINITY}. This method may be a useful
* alternative to creating a new updater, but is only effective if
* there are no concurrent updates. Because this method is
* intrinsically racy, it should only be used when it is known
* that no threads are concurrently updating.
*/
public void reset() {
internalReset(MIN_AS_LONG);
}
/**
* Equivalent in effect to {@link #max} followed by {@link
* #reset}. This method may apply for example during quiescent
* points between multithreaded computations. If there are
* updates concurrent with this method, the returned value is
* <em>not</em> guaranteed to be the final value occurring before
* the reset.
*
* @return the maximum
*/
public double maxThenReset() {
Cell[] as = cells;
double max = Double.longBitsToDouble(base);
base = MIN_AS_LONG;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null) {
double v = Double.longBitsToDouble(a.value);
a.value = MIN_AS_LONG;
if (v > max)
max = v;
}
}
}
return max;
}
/**
* Returns the String representation of the {@link #max}.
* @return the String representation of the {@link #max}
*/
public String toString() {
return Double.toString(max());
}
/**
* Equivalent to {@link #max}.
*
* @return the max
*/
public double doubleValue() {
return max();
}
/**
* Returns the {@link #max} as a {@code long} after a
* narrowing primitive conversion.
*/
public long longValue() {
return (long)max();
}
/**
* Returns the {@link #max} as an {@code int} after a
* narrowing primitive conversion.
*/
public int intValue() {
return (int)max();
}
/**
* Returns the {@link #max} as a {@code float}
* after a narrowing primitive conversion.
*/
public float floatValue() {
return (float)max();
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeDouble(max());
}
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
busy = 0;
cells = null;
base = Double.doubleToRawLongBits(s.readDouble());
}
}

View File

@ -0,0 +1,202 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import java.util.concurrent.atomic.AtomicLong;
import java.io.IOException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* One or more variables that together maintain an initially zero
* {@code long} sum. When updates (method {@link #add}) are contended
* across threads, the set of variables may grow dynamically to reduce
* contention. Method {@link #sum} (or, equivalently, {@link
* #longValue}) returns the current total combined across the
* variables maintaining the sum.
*
* <p> This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar
* characteristics. But under high contention, expected throughput of
* this class is significantly higher, at the expense of higher space
* consumption.
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code hashCode} and {@code compareTo} because
* instances are expected to be mutated, and so are not useful as
* collection keys.
*
* <p><em>jsr166e note: This class is targeted to be placed in
* java.util.concurrent.atomic<em>
*
* @since 1.8
* @author Doug Lea
*/
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* Version of plus for use in retryUpdate
*/
final long fn(long v, long x) { return v + x; }
/**
* Creates a new adder with initial sum of zero.
*/
public LongAdder() {
}
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; HashCode hc; Cell a; int n;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
(a = as[(n - 1) & h]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
retryUpdate(x, hc, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment() {
add(1L);
}
/**
* Equivalent to {@code add(-1)}.
*/
public void decrement() {
add(-1L);
}
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot: Invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated.
*
* @return the sum
*/
public long sum() {
long sum = base;
Cell[] as = cells;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null)
sum += a.value;
}
}
return sum;
}
/**
* Resets variables maintaining the sum to zero. This method may
* be a useful alternative to creating a new adder, but is only
* effective if there are no concurrent updates. Because this
* method is intrinsically racy, it should only be used when it is
* known that no threads are concurrently updating.
*/
public void reset() {
internalReset(0L);
}
/**
* Equivalent in effect to {@link #sum} followed by {@link
* #reset}. This method may apply for example during quiescent
* points between multithreaded computations. If there are
* updates concurrent with this method, the returned value is
* <em>not</em> guaranteed to be the final value occurring before
* the reset.
*
* @return the sum
*/
public long sumThenReset() {
long sum = base;
Cell[] as = cells;
base = 0L;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
/**
* Returns the String representation of the {@link #sum}.
* @return the String representation of the {@link #sum}
*/
public String toString() {
return Long.toString(sum());
}
/**
* Equivalent to {@link #sum}.
*
* @return the sum
*/
public long longValue() {
return sum();
}
/**
* Returns the {@link #sum} as an {@code int} after a narrowing
* primitive conversion.
*/
public int intValue() {
return (int)sum();
}
/**
* Returns the {@link #sum} as a {@code float}
* after a widening primitive conversion.
*/
public float floatValue() {
return (float)sum();
}
/**
* Returns the {@link #sum} as a {@code double} after a widening
* primitive conversion.
*/
public double doubleValue() {
return (double)sum();
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeLong(sum());
}
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
busy = 0;
cells = null;
base = s.readLong();
}
}

View File

@ -0,0 +1,189 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import jsr166e.LongAdder;
import java.util.Map;
import java.util.Set;
import java.io.Serializable;
/**
* A keyed table of adders, that may be useful in computing frequency
* counts and histograms, or may be used as a form of multiset. A
* {@link LongAdder} is associated with each key. Keys are added to
* the table implicitly upon any attempt to update, or may be added
* explicitly using method {@link #install}.
*
* <p><em>jsr166e note: This class is targeted to be placed in
* java.util.concurrent.atomic<em>
*
* @since 1.8
* @author Doug Lea
*/
public class LongAdderTable<K> implements Serializable {
/** Relies on default serialization */
private static final long serialVersionUID = 7249369246863182397L;
/** The underlying map */
private final ConcurrentHashMapV8<K, LongAdder> map;
static final class CreateAdder
implements ConcurrentHashMapV8.MappingFunction<Object, LongAdder> {
public LongAdder map(Object unused) { return new LongAdder(); }
}
private static final CreateAdder createAdder = new CreateAdder();
/**
* Creates a new empty table.
*/
public LongAdderTable() {
map = new ConcurrentHashMapV8<K, LongAdder>();
}
/**
* If the given key does not already exist in the table, inserts
* the key with initial sum of zero; in either case returning the
* adder associated with this key.
*
* @param key the key
* @return the adder associated with the key
*/
public LongAdder install(K key) {
return map.computeIfAbsent(key, createAdder);
}
/**
* Adds the given value to the sum associated with the given
* key. If the key does not already exist in the table, it is
* inserted.
*
* @param key the key
* @param x the value to add
*/
public void add(K key, long x) {
map.computeIfAbsent(key, createAdder).add(x);
}
/**
* Increments the sum associated with the given key. If the key
* does not already exist in the table, it is inserted.
*
* @param key the key
*/
public void increment(K key) { add(key, 1L); }
/**
* Decrements the sum associated with the given key. If the key
* does not already exist in the table, it is inserted.
*
* @param key the key
*/
public void decrement(K key) { add(key, -1L); }
/**
* Returns the sum associated with the given key, or zero if the
* key does not currently exist in the table.
*
* @param key the key
* @return the sum associated with the key, or zero if the key is
* not in the table
*/
public long sum(K key) {
LongAdder a = map.get(key);
return a == null ? 0L : a.sum();
}
/**
* Resets the sum associated with the given key to zero if the key
* exists in the table. This method does <em>NOT</em> add or
* remove the key from the table (see {@link #remove}).
*
* @param key the key
*/
public void reset(K key) {
LongAdder a = map.get(key);
if (a != null)
a.reset();
}
/**
* Resets the sum associated with the given key to zero if the key
* exists in the table. This method does <em>NOT</em> add or
* remove the key from the table (see {@link #remove}).
*
* @param key the key
* @return the previous sum, or zero if the key is not
* in the table
*/
public long sumThenReset(K key) {
LongAdder a = map.get(key);
return a == null ? 0L : a.sumThenReset();
}
/**
* Returns the sum totalled across all keys.
*
* @return the sum totalled across all keys
*/
public long sumAll() {
long sum = 0L;
for (LongAdder a : map.values())
sum += a.sum();
return sum;
}
/**
* Resets the sum associated with each key to zero.
*/
public void resetAll() {
for (LongAdder a : map.values())
a.reset();
}
/**
* Totals, then resets, the sums associated with all keys.
*
* @return the sum totalled across all keys
*/
public long sumThenResetAll() {
long sum = 0L;
for (LongAdder a : map.values())
sum += a.sumThenReset();
return sum;
}
/**
* Removes the given key from the table.
*
* @param key the key
*/
public void remove(K key) { map.remove(key); }
/**
* Removes all keys from the table.
*/
public void removeAll() { map.clear(); }
/**
* Returns the current set of keys.
*
* @return the current set of keys
*/
public Set<K> keySet() {
return map.keySet();
}
/**
* Returns the current set of key-value mappings.
*
* @return the current set of key-value mappings
*/
public Set<Map.Entry<K,LongAdder>> entrySet() {
return map.entrySet();
}
}

View File

@ -0,0 +1,186 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import java.io.IOException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* One or more variables that together maintain a running {@code long}
* maximum with initial value {@code Long.MIN_VALUE}. When updates
* (method {@link #update}) are contended across threads, the set of
* variables may grow dynamically to reduce contention. Method {@link
* #max} (or, equivalently, {@link #longValue}) returns the current
* maximum across the variables maintaining updates.
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code hashCode} and {@code compareTo} because
* instances are expected to be mutated, and so are not useful as
* collection keys.
*
* <p><em>jsr166e note: This class is targeted to be placed in
* java.util.concurrent.atomic<em>
*
* @since 1.8
* @author Doug Lea
*/
public class LongMaxUpdater extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* Version of max for use in retryUpdate
*/
final long fn(long v, long x) { return v > x ? v : x; }
/**
* Creates a new instance with initial maximum of {@code
* Long.MIN_VALUE}.
*/
public LongMaxUpdater() {
base = Long.MIN_VALUE;
}
/**
* Updates the maximum to be at least the given value.
*
* @param x the value to update
*/
public void update(long x) {
Cell[] as; long b, v; HashCode hc; Cell a; int n;
if ((as = cells) != null ||
(b = base) < x && !casBase(b, x)) {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
(a = as[(n - 1) & h]) == null ||
((v = a.value) < x && !(uncontended = a.cas(v, x))))
retryUpdate(x, hc, uncontended);
}
}
/**
* Returns the current maximum. The returned value is
* <em>NOT</em> an atomic snapshot: Invocation in the absence of
* concurrent updates returns an accurate result, but concurrent
* updates that occur while the value is being calculated might
* not be incorporated.
*
* @return the maximum
*/
public long max() {
Cell[] as = cells;
long max = base;
if (as != null) {
int n = as.length;
long v;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null && (v = a.value) > max)
max = v;
}
}
return max;
}
/**
* Resets variables maintaining updates to {@code Long.MIN_VALUE}.
* This method may be a useful alternative to creating a new
* updater, but is only effective if there are no concurrent
* updates. Because this method is intrinsically racy, it should
* only be used when it is known that no threads are concurrently
* updating.
*/
public void reset() {
internalReset(Long.MIN_VALUE);
}
/**
* Equivalent in effect to {@link #max} followed by {@link
* #reset}. This method may apply for example during quiescent
* points between multithreaded computations. If there are
* updates concurrent with this method, the returned value is
* <em>not</em> guaranteed to be the final value occurring before
* the reset.
*
* @return the maximum
*/
public long maxThenReset() {
Cell[] as = cells;
long max = base;
base = Long.MIN_VALUE;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null) {
long v = a.value;
a.value = Long.MIN_VALUE;
if (v > max)
max = v;
}
}
}
return max;
}
/**
* Returns the String representation of the {@link #max}.
* @return the String representation of the {@link #max}
*/
public String toString() {
return Long.toString(max());
}
/**
* Equivalent to {@link #max}.
*
* @return the maximum
*/
public long longValue() {
return max();
}
/**
* Returns the {@link #max} as an {@code int} after a narrowing
* primitive conversion.
*/
public int intValue() {
return (int)max();
}
/**
* Returns the {@link #max} as a {@code float}
* after a widening primitive conversion.
*/
public float floatValue() {
return (float)max();
}
/**
* Returns the {@link #max} as a {@code double} after a widening
* primitive conversion.
*/
public double doubleValue() {
return (double)max();
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeLong(max());
}
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
busy = 0;
cells = null;
base = s.readLong();
}
}

View File

@ -0,0 +1,639 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
import java.util.Collection;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.io.IOException;
/**
* A reentrant mutual exclusion {@link Lock} in which each lock
* acquisition or release advances a sequence number. When the
* sequence number (accessible using {@link #getSequence()}) is odd,
* the lock is held. When it is even (i.e., ({@code lock.getSequence()
* & 1L) == 0L}), the lock is released. Method {@link
* #awaitAvailability} can be used to await availability of the lock,
* returning its current sequence number. Sequence numbers (as well as
* reentrant hold counts) are of type {@code long} to ensure that they
* will not wrap around until hundreds of years of use under current
* processor rates. A SequenceLock can be created with a specified
* number of spins. Attempts to acquire the lock in method {@link
* #lock} will retry at least the given number of times before
* blocking. If not specified, a default, possibly platform-specific,
* value is used.
*
* <p>Except for the lack of support for specified fairness policies,
* or {@link Condition} objects, a SequenceLock can be used in the
* same way as {@link ReentrantLock}. It provides similar status and
* monitoring methods, such as {@link #isHeldByCurrentThread}.
* SequenceLocks may be preferable in contexts in which multiple
* threads invoke short read-only methods much more frequently than
* fully locked methods.
*
* <p> Methods {@code awaitAvailability} and {@code getSequence} can
* be used together to define (partially) optimistic read-only methods
* that are usually more efficient than ReadWriteLocks when they
* apply. These methods should in general be structured as loops that
* await lock availability, then read {@code volatile} fields into
* local variables (and may further read other values derived from
* these, for example the {@code length} of a {@code volatile} array),
* and retry if the sequence number changed while doing so.
* Alternatively, because {@code awaitAvailability} accommodates
* reentrancy, a method can retry a bounded number of times before
* switching to locking mode. While conceptually straightforward,
* expressing these ideas can be verbose. For example:
*
* <pre> {@code
* class Point {
* private volatile double x, y;
* private final SequenceLock sl = new SequenceLock();
*
* // an exclusively locked method
* void move(double deltaX, double deltaY) {
* sl.lock();
* try {
* x += deltaX;
* y += deltaY;
* } finally {
* sl.unlock();
* }
* }
*
* // A read-only method
* double distanceFromOriginV1() {
* double currentX, currentY;
* long seq;
* do {
* seq = sl.awaitAvailability();
* currentX = x;
* currentY = y;
* } while (sl.getSequence() != seq); // retry if sequence changed
* return Math.sqrt(currentX * currentX + currentY * currentY);
* }
*
* // Uses bounded retries before locking
* double distanceFromOriginV2() {
* double currentX, currentY;
* long seq;
* int retries = RETRIES_BEFORE_LOCKING; // for example 8
* try {
* do {
* if (--retries < 0)
* sl.lock();
* seq = sl.awaitAvailability();
* currentX = x;
* currentY = y;
* } while (sl.getSequence() != seq);
* } finally {
* if (retries < 0)
* sl.unlock();
* }
* return Math.sqrt(currentX * currentX + currentY * currentY);
* }
* }}</pre>
*
* @since 1.8
* @author Doug Lea
*/
public class SequenceLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
static final class Sync extends AbstractQueuedLongSynchronizer {
static final long serialVersionUID = 2540673546047039555L;
/**
* The number of times to spin in lock() and awaitAvailability().
*/
final int spins;
/**
* The number of reentrant holds on this lock. Uses a long for
* compatibility with other AbstractQueuedLongSynchronizer
* operations. Accessed only by lock holder.
*/
long holds;
Sync(int spins) { this.spins = spins; }
// overrides of AQLS methods
public final boolean isHeldExclusively() {
return (getState() & 1L) != 0L &&
getExclusiveOwnerThread() == Thread.currentThread();
}
public final boolean tryAcquire(long acquires) {
Thread current = Thread.currentThread();
long c = getState();
if ((c & 1L) == 0L) {
if (compareAndSetState(c, c + 1L)) {
holds = acquires;
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
holds += acquires;
return true;
}
return false;
}
public final boolean tryRelease(long releases) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
if ((holds -= releases) == 0L) {
setExclusiveOwnerThread(null);
setState(getState() + 1L);
return true;
}
return false;
}
public final long tryAcquireShared(long unused) {
return (((getState() & 1L) == 0L) ? 1L :
(getExclusiveOwnerThread() == Thread.currentThread()) ? 0L:
-1L);
}
public final boolean tryReleaseShared(long unused) {
return (getState() & 1L) == 0L;
}
public final Condition newCondition() {
throw new UnsupportedOperationException();
}
// Other methods in support of SequenceLock
final long getSequence() {
return getState();
}
final void lock() {
int k = spins;
while (!tryAcquire(1L)) {
if (k == 0) {
acquire(1L);
break;
}
--k;
}
}
final long awaitAvailability() {
long s;
while (((s = getState()) & 1L) != 0L &&
getExclusiveOwnerThread() != Thread.currentThread()) {
acquireShared(1L);
releaseShared(1L);
}
return s;
}
final long tryAwaitAvailability(long nanos)
throws InterruptedException, TimeoutException {
Thread current = Thread.currentThread();
for (;;) {
long s = getState();
if ((s & 1L) == 0L || getExclusiveOwnerThread() == current) {
releaseShared(1L);
return s;
}
if (!tryAcquireSharedNanos(1L, nanos))
throw new TimeoutException();
// since tryAcquireSharedNanos doesn't return seq
// retry with minimal wait time.
nanos = 1L;
}
}
final boolean isLocked() {
return (getState() & 1L) != 0L;
}
final Thread getOwner() {
return (getState() & 1L) == 0L ? null : getExclusiveOwnerThread();
}
final long getHoldCount() {
return isHeldExclusively() ? holds : 0;
}
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
holds = 0L;
setState(0L); // reset to unlocked state
}
}
private final Sync sync;
/**
* The default spin value for constructor. Future versions of this
* class might choose platform-specific values. Currently, except
* on uniprocessors, it is set to a small value that overcomes near
* misses between releases and acquires.
*/
static final int DEFAULT_SPINS =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 0;
/**
* Creates an instance of {@code SequenceLock} with the default
* number of retry attempts to acquire the lock before blocking.
*/
public SequenceLock() { sync = new Sync(DEFAULT_SPINS); }
/**
* Creates an instance of {@code SequenceLock} that will retry
* attempts to acquire the lock at least the given number times
* before blocking.
*/
public SequenceLock(int spins) { sync = new Sync(spins); }
/**
* Returns the current sequence number of this lock. The sequence
* number is advanced upon each acquire or release action. When
* this value is odd, the lock is held; when even, it is released.
*
* @return the current sequence number
*/
public long getSequence() { return sync.getSequence(); }
/**
* Returns the current sequence number when the lock is, or
* becomes, available. A lock is available if it is either
* released, or is held by the current thread. If the lock is not
* available, the current thread becomes disabled for thread
* scheduling purposes and lies dormant until the lock has been
* released by some other thread.
*
* @return the current sequence number
*/
public long awaitAvailability() { return sync.awaitAvailability(); }
/**
* Returns the current sequence number if the lock is, or
* becomes, available within the specified waiting time.
*
* <p>If the lock is not available, the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
*
* <ul>
*
* <li>The lock becomes available, in which case the current
* sequence number is returned.
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread, in which case this method throws
* {@link InterruptedException}.
*
* <li>The specified waiting time elapses, in which case
* this method throws {@link TimeoutException}.
*
* </ul>
*
* @param timeout the time to wait for availability
* @param unit the time unit of the timeout argument
* @return the current sequence number if the lock is available
* upon return from this method
* @throws InterruptedException if the current thread is interrupted
* @throws TimeoutException if the lock was not available within
* the specified waiting time
* @throws NullPointerException if the time unit is null
*/
public long tryAwaitAvailability(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
return sync.tryAwaitAvailability(unit.toNanos(timeout));
}
/**
* Acquires the lock.
*
* <p>If the current thread already holds this lock then the hold count
* is incremented by one and the method returns immediately without
* incrementing the sequence number.
*
* <p>If this lock not held by another thread, this method
* increments the sequence number (which thus becomes an odd
* number), sets the lock hold count to one, and returns
* immediately.
*
* <p>If the lock is held by another thread then the current
* thread may retry acquiring this lock, depending on the {@code
* spin} count established in constructor. If the lock is still
* not acquired, the current thread becomes disabled for thread
* scheduling purposes and lies dormant until enabled by
* some other thread releasing the lock.
*/
public void lock() { sync.lock(); }
/**
* Acquires the lock unless the current thread is
* {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current thread already holds this lock then the hold count
* is incremented by one and the method returns immediately without
* incrementing the sequence number.
*
* <p>If this lock not held by another thread, this method
* increments the sequence number (which thus becomes an odd
* number), sets the lock hold count to one, and returns
* immediately.
*
* <p>If the lock is held by another thread then the current
* thread may retry acquiring this lock, depending on the {@code
* spin} count established in constructor. If the lock is still
* not acquired, the current thread becomes disabled for thread
* scheduling purposes and lies dormant until one of two things
* happens:
*
* <ul>
*
* <li>The lock is acquired by the current thread; or
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts} the
* current thread.
*
* </ul>
*
* <p>If the lock is acquired by the current thread then the lock hold
* count is set to one and the sequence number is incremented.
*
* <p>If the current thread:
*
* <ul>
*
* <li>has its interrupted status set on entry to this method; or
*
* <li>is {@linkplain Thread#interrupt interrupted} while acquiring
* the lock,
*
* </ul>
*
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>In this implementation, as this method is an explicit
* interruption point, preference is given to responding to the
* interrupt over normal or reentrant acquisition of the lock.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1L);
}
/**
* Acquires the lock only if it is not held by another thread at the time
* of invocation.
*
* <p>If the current thread already holds this lock then the hold
* count is incremented by one and the method returns {@code true}
* without incrementing the sequence number.
*
* <p>If this lock not held by another thread, this method
* increments the sequence number (which thus becomes an odd
* number), sets the lock hold count to one, and returns {@code
* true}.
*
* <p>If the lock is held by another thread then this method
* returns {@code false}.
*
* @return {@code true} if the lock was free and was acquired by the
* current thread, or the lock was already held by the current
* thread; and {@code false} otherwise
*/
public boolean tryLock() { return sync.tryAcquire(1L); }
/**
* Acquires the lock if it is not held by another thread within the given
* waiting time and the current thread has not been
* {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current thread already holds this lock then the hold count
* is incremented by one and the method returns immediately without
* incrementing the sequence number.
*
* <p>If this lock not held by another thread, this method
* increments the sequence number (which thus becomes an odd
* number), sets the lock hold count to one, and returns
* immediately.
*
* <p>If the lock is held by another thread then the current
* thread may retry acquiring this lock, depending on the {@code
* spin} count established in constructor. If the lock is still
* not acquired, the current thread becomes disabled for thread
* scheduling purposes and lies dormant until one of three things
* happens:
*
* <ul>
*
* <li>The lock is acquired by the current thread; or
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* <li>The specified waiting time elapses
*
* </ul>
*
* <p>If the lock is acquired then the value {@code true} is returned and
* the lock hold count is set to one.
*
* <p>If the current thread:
*
* <ul>
*
* <li>has its interrupted status set on entry to this method; or
*
* <li>is {@linkplain Thread#interrupt interrupted} while
* acquiring the lock,
*
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* <p>In this implementation, as this method is an explicit
* interruption point, preference is given to responding to the
* interrupt over normal or reentrant acquisition of the lock, and
* over reporting the elapse of the waiting time.
*
* @param timeout the time to wait for the lock
* @param unit the time unit of the timeout argument
* @return {@code true} if the lock was free and was acquired by the
* current thread, or the lock was already held by the current
* thread; and {@code false} if the waiting time elapsed before
* the lock could be acquired
* @throws InterruptedException if the current thread is interrupted
* @throws NullPointerException if the time unit is null
*
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1L, unit.toNanos(timeout));
}
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the
* hold count is decremented. If the hold count is now zero then
* the sequence number is incremented (thus becoming an even
* number) and the lock is released. If the current thread is not
* the holder of this lock then {@link
* IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() { sync.release(1); }
/**
* Throws UnsupportedOperationException. SequenceLocks
* do not support Condition objects.
*
* @throws UnsupportedOperationException
*/
public Condition newCondition() {
throw new UnsupportedOperationException();
}
/**
* Queries the number of holds on this lock by the current thread.
*
* <p>A thread has a hold on a lock for each lock action that is not
* matched by an unlock action.
*
* <p>The hold count information is typically only used for testing and
* debugging purposes.
*
* @return the number of holds on this lock by the current thread,
* or zero if this lock is not held by the current thread
*/
public long getHoldCount() { return sync.getHoldCount(); }
/**
* Queries if this lock is held by the current thread.
*
* @return {@code true} if current thread holds this lock and
* {@code false} otherwise
*/
public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }
/**
* Queries if this lock is held by any thread. This method is
* designed for use in monitoring of the system state,
* not for synchronization control.
*
* @return {@code true} if any thread holds this lock and
* {@code false} otherwise
*/
public boolean isLocked() { return sync.isLocked(); }
/**
* Returns the thread that currently owns this lock, or
* {@code null} if not owned. When this method is called by a
* thread that is not the owner, the return value reflects a
* best-effort approximation of current lock status. For example,
* the owner may be momentarily {@code null} even if there are
* threads trying to acquire the lock but have not yet done so.
* This method is designed to facilitate construction of
* subclasses that provide more extensive lock monitoring
* facilities.
*
* @return the owner, or {@code null} if not owned
*/
protected Thread getOwner() { return sync.getOwner(); }
/**
* Queries whether any threads are waiting to acquire this lock. Note that
* because cancellations may occur at any time, a {@code true}
* return does not guarantee that any other thread will ever
* acquire this lock. This method is designed primarily for use in
* monitoring of the system state.
*
* @return {@code true} if there may be other threads waiting to
* acquire the lock
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Queries whether the given thread is waiting to acquire this
* lock. Note that because cancellations may occur at any time, a
* {@code true} return does not guarantee that this thread
* will ever acquire this lock. This method is designed primarily for use
* in monitoring of the system state.
*
* @param thread the thread
* @return {@code true} if the given thread is queued waiting for this lock
* @throws NullPointerException if the thread is null
*/
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
/**
* Returns an estimate of the number of threads waiting to
* acquire this lock. The value is only an estimate because the number of
* threads may change dynamically while this method traverses
* internal data structures. This method is designed for use in
* monitoring of the system state, not for synchronization
* control.
*
* @return the estimated number of threads waiting for this lock
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* Returns a collection containing threads that may be waiting to
* acquire this lock. Because the actual set of threads may change
* dynamically while constructing this result, the returned
* collection is only a best-effort estimate. The elements of the
* returned collection are in no particular order. This method is
* designed to facilitate construction of subclasses that provide
* more extensive monitoring facilities.
*
* @return the collection of threads
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
/**
* Returns a string identifying this lock, as well as its lock state.
* The state, in brackets, includes either the String {@code "Unlocked"}
* or the String {@code "Locked by"} followed by the
* {@linkplain Thread#getName name} of the owning thread.
*
* @return a string identifying this lock, as well as its lock state
*/
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}

View File

@ -0,0 +1,340 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e;
import java.util.Random;
/**
* A package-local class holding common representation and mechanics
* for classes supporting dynamic striping on 64bit values. The class
* extends Number so that concrete subclasses must publicly do so.
*/
abstract class Striped64 extends Number {
/*
* This class maintains a lazily-initialized table of atomically
* updated variables, plus an extra "base" field. The table size
* is a power of two. Indexing uses masked per-thread hash codes.
* Nearly all declarations in this class are package-private,
* accessed directly by subclasses.
*
* Table entries are of class Cell; a variant of AtomicLong padded
* to reduce cache contention on most processors. Padding is
* overkill for most Atomics because they are usually irregularly
* scattered in memory and thus don't interfere much with each
* other. But Atomic objects residing in arrays will tend to be
* placed adjacent to each other, and so will most often share
* cache lines (with a huge negative performance impact) without
* this precaution.
*
* In part because Cells are relatively large, we avoid creating
* them until they are needed. When there is no contention, all
* updates are made to the base field. Upon first contention (a
* failed CAS on base update), the table is initialized to size 2.
* The table size is doubled upon further contention until
* reaching the nearest power of two greater than or equal to the
* number of CPUS. Table slots remain empty (null) until they are
* needed.
*
* A single spinlock ("busy") is used for initializing and
* resizing the table, as well as populating slots with new Cells.
* There is no need for a blocking lock: When the lock is not
* available, threads try other slots (or the base). During these
* retries, there is increased contention and reduced locality,
* which is still better than alternatives.
*
* Per-thread hash codes are initialized to random values.
* Contention and/or table collisions are indicated by failed
* CASes when performing an update operation (see method
* retryUpdate). Upon a collision, if the table size is less than
* the capacity, it is doubled in size unless some other thread
* holds the lock. If a hashed slot is empty, and lock is
* available, a new Cell is created. Otherwise, if the slot
* exists, a CAS is tried. Retries proceed by "double hashing",
* using a secondary hash (Marsaglia XorShift) to try to find a
* free slot.
*
* The table size is capped because, when there are more threads
* than CPUs, supposing that each thread were bound to a CPU,
* there would exist a perfect hash function mapping threads to
* slots that eliminates collisions. When we reach capacity, we
* search for this mapping by randomly varying the hash codes of
* colliding threads. Because search is random, and collisions
* only become known via CAS failures, convergence can be slow,
* and because threads are typically not bound to CPUS forever,
* may not occur at all. However, despite these limitations,
* observed contention rates are typically low in these cases.
*
* It is possible for a Cell to become unused when threads that
* once hashed to it terminate, as well as in the case where
* doubling the table causes no thread to hash to it under
* expanded mask. We do not try to detect or remove such cells,
* under the assumption that for long-running instances, observed
* contention levels will recur, so the cells will eventually be
* needed again; and for short-lived ones, it does not matter.
*/
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
* The value field is placed between pads, hoping that the JVM doesn't
* reorder them.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/**
* Holder for the thread-local hash code. The code is initially
* random, but may be set to a different value upon collisions.
*/
static final class HashCode {
static final Random rng = new Random();
int code;
HashCode() {
int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
code = (h == 0) ? 1 : h;
}
}
/**
* The corresponding ThreadLocal class
*/
static final class ThreadHashCode extends ThreadLocal<HashCode> {
public HashCode initialValue() { return new HashCode(); }
}
/**
* Static per-thread hash codes. Shared across all instances to
* reduce ThreadLocal pollution and because adjustments due to
* collisions in one table are likely to be appropriate for
* others.
*/
static final ThreadHashCode threadHashCode = new ThreadHashCode();
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int busy;
/**
* Package-private default constructor
*/
Striped64() {
}
/**
* CASes the base field.
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val);
}
/**
* CASes the busy field from 0 to 1 to acquire lock.
*/
final boolean casBusy() {
return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1);
}
/**
* Computes the function of current and new value. Subclasses
* should open-code this update function for most uses, but the
* virtualized form is needed within retryUpdate.
*
* @param currentValue the current value (of either base or a cell)
* @param newValue the argument from a user update call
* @return result of the update function
*/
abstract long fn(long currentValue, long newValue);
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
*
* @param x the value
* @param hc the hash code holder
* @param wasUncontended false if CAS failed before call
*/
final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {
int h = hc.code;
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (busy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (busy == 0 && casBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
busy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, fn(v, x)))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (busy == 0 && casBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
busy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h ^= h << 13; // Rehash
h ^= h >>> 17;
h ^= h << 5;
}
else if (busy == 0 && cells == as && casBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
busy = 0;
}
if (init)
break;
}
else if (casBase(v = base, fn(v, x)))
break; // Fall back on using base
}
hc.code = h; // Record index for next time
}
/**
* Sets base and all cells to the given value.
*/
final void internalReset(long initialValue) {
Cell[] as = cells;
base = initialValue;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null)
a.value = initialValue;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long baseOffset;
private static final long busyOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> sk = Striped64.class;
baseOffset = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
busyOffset = UNSAFE.objectFieldOffset
(sk.getDeclaredField("busy"));
} catch (Exception e) {
throw new Error(e);
}
}
/**
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
* Replace with a simple call to Unsafe.getUnsafe when integrating
* into a jdk.
*
* @return a sun.misc.Unsafe
*/
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
(new java.security
.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
java.lang.reflect.Field f = sun.misc
.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (sun.misc.Unsafe) f.get(null);
}});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
}
}
}

View File

@ -0,0 +1,273 @@
/*
* Written by Doug Lea and Martin Buchholz with assistance from
* members of JCP JSR-166 Expert Group and released to the public
* domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e.extra;
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
/**
* A {@code double} value that may be updated atomically. See the
* {@link java.util.concurrent.atomic} package specification for
* description of the properties of atomic variables. An {@code
* AtomicDouble} is used in applications such as atomic accumulation,
* and cannot be used as a replacement for a {@link Double}. However,
* this class does extend {@code Number} to allow uniform access by
* tools and utilities that deal with numerically-based classes.
*
* <p><a name="bitEquals">This class compares primitive {@code double}
* values in methods such as {@link #compareAndSet} by comparing their
* bitwise representation using {@link Double#doubleToRawLongBits},
* which differs from both the primitive double {@code ==} operator
* and from {@link Double#equals}, as if implemented by:
* <pre> {@code
* static boolean bitEquals(double x, double y) {
* long xBits = Double.doubleToRawLongBits(x);
* long yBits = Double.doubleToRawLongBits(y);
* return xBits == yBits;
* }}</pre>
*
* @see jsr166e.DoubleAdder
* @see jsr166e.DoubleMaxUpdater
*
* @author Doug Lea
* @author Martin Buchholz
*/
public class AtomicDouble extends Number implements java.io.Serializable {
private static final long serialVersionUID = -8405198993435143622L;
private transient volatile long value;
/**
* Creates a new {@code AtomicDouble} with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicDouble(double initialValue) {
value = doubleToRawLongBits(initialValue);
}
/**
* Creates a new {@code AtomicDouble} with initial value {@code 0.0}.
*/
public AtomicDouble() {
// assert doubleToRawLongBits(0.0) == 0L;
}
/**
* Gets the current value.
*
* @return the current value
*/
public final double get() {
return longBitsToDouble(value);
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(double newValue) {
long next = doubleToRawLongBits(newValue);
value = next;
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
*/
public final void lazySet(double newValue) {
long next = doubleToRawLongBits(newValue);
unsafe.putOrderedLong(this, valueOffset, next);
}
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final double getAndSet(double newValue) {
long next = doubleToRawLongBits(newValue);
while (true) {
long current = value;
if (unsafe.compareAndSwapLong(this, valueOffset, current, next))
return longBitsToDouble(current);
}
}
/**
* Atomically sets the value to the given updated value
* if the current value is <a href="#bitEquals">bitwise equal</a>
* to the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not bitwise equal to the expected value.
*/
public final boolean compareAndSet(double expect, double update) {
return unsafe.compareAndSwapLong(this, valueOffset,
doubleToRawLongBits(expect),
doubleToRawLongBits(update));
}
/**
* Atomically sets the value to the given updated value
* if the current value is <a href="#bitEquals">bitwise equal</a>
* to the expected value.
*
* <p>May <a
* href="http://download.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html#Spurious">
* fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
* appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(double expect, double update) {
return compareAndSet(expect, update);
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final double getAndAdd(double delta) {
while (true) {
long current = value;
double currentVal = longBitsToDouble(current);
double nextVal = currentVal + delta;
long next = doubleToRawLongBits(nextVal);
if (unsafe.compareAndSwapLong(this, valueOffset, current, next))
return currentVal;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final double addAndGet(double delta) {
while (true) {
long current = value;
double currentVal = longBitsToDouble(current);
double nextVal = currentVal + delta;
long next = doubleToRawLongBits(nextVal);
if (unsafe.compareAndSwapLong(this, valueOffset, current, next))
return nextVal;
}
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value
*/
public String toString() {
return Double.toString(get());
}
/**
* Returns the value of this {@code AtomicDouble} as an {@code int}
* after a narrowing primitive conversion.
*/
public int intValue() {
return (int) get();
}
/**
* Returns the value of this {@code AtomicDouble} as a {@code long}
* after a narrowing primitive conversion.
*/
public long longValue() {
return (long) get();
}
/**
* Returns the value of this {@code AtomicDouble} as a {@code float}
* after a narrowing primitive conversion.
*/
public float floatValue() {
return (float) get();
}
/**
* Returns the value of this {@code AtomicDouble} as a {@code double}.
*/
public double doubleValue() {
return get();
}
/**
* Saves the state to a stream (that is, serializes it).
*
* @serialData The current value is emitted (a {@code double}).
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeDouble(get());
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
set(s.readDouble());
}
// Unsafe mechanics
private static final sun.misc.Unsafe unsafe = getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicDouble.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
* Replace with a simple call to Unsafe.getUnsafe when integrating
* into a jdk.
*
* @return a sun.misc.Unsafe
*/
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
(new java.security
.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
java.lang.reflect.Field f = sun.misc
.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (sun.misc.Unsafe) f.get(null);
}});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
}
}
}

View File

@ -0,0 +1,320 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166e.extra;
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
/**
* A {@code double} array in which elements may be updated atomically.
* See the {@link java.util.concurrent.atomic} package specification
* for description of the properties of atomic variables.
*
* <p><a name="bitEquals">This class compares primitive {@code double}
* values in methods such as {@link #compareAndSet} by comparing their
* bitwise representation using {@link Double#doubleToRawLongBits},
* which differs from both the primitive double {@code ==} operator
* and from {@link Double#equals}, as if implemented by:
* <pre> {@code
* static boolean bitEquals(double x, double y) {
* long xBits = Double.doubleToRawLongBits(x);
* long yBits = Double.doubleToRawLongBits(y);
* return xBits == yBits;
* }}</pre>
*
* @author Doug Lea
* @author Martin Buchholz
*/
public class AtomicDoubleArray implements java.io.Serializable {
private static final long serialVersionUID = -2308431214976778248L;
private final transient long[] array;
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
/**
* Creates a new {@code AtomicDoubleArray} of the given length,
* with all elements initially zero.
*
* @param length the length of the array
*/
public AtomicDoubleArray(int length) {
array = new long[length];
}
/**
* Creates a new {@code AtomicDoubleArray} with the same length
* as, and all elements copied from, the given array.
*
* @param array the array to copy elements from
* @throws NullPointerException if array is null
*/
public AtomicDoubleArray(double[] array) {
// Visibility guaranteed by final field guarantees
final int len = array.length;
final long[] a = new long[len];
for (int i = 0; i < len; i++)
a[i] = doubleToRawLongBits(array[i]);
this.array = a;
}
/**
* Returns the length of the array.
*
* @return the length of the array
*/
public final int length() {
return array.length;
}
/**
* Gets the current value at position {@code i}.
*
* @param i the index
* @return the current value
*/
public final double get(int i) {
return longBitsToDouble(getRaw(checkedByteOffset(i)));
}
private long getRaw(long offset) {
return unsafe.getLongVolatile(array, offset);
}
/**
* Sets the element at position {@code i} to the given value.
*
* @param i the index
* @param newValue the new value
*/
public final void set(int i, double newValue) {
long next = doubleToRawLongBits(newValue);
unsafe.putLongVolatile(array, checkedByteOffset(i), next);
}
/**
* Eventually sets the element at position {@code i} to the given value.
*
* @param i the index
* @param newValue the new value
*/
public final void lazySet(int i, double newValue) {
long next = doubleToRawLongBits(newValue);
unsafe.putOrderedLong(array, checkedByteOffset(i), next);
}
/**
* Atomically sets the element at position {@code i} to the given value
* and returns the old value.
*
* @param i the index
* @param newValue the new value
* @return the previous value
*/
public final double getAndSet(int i, double newValue) {
long next = doubleToRawLongBits(newValue);
long offset = checkedByteOffset(i);
while (true) {
long current = getRaw(offset);
if (compareAndSetRaw(offset, current, next))
return longBitsToDouble(current);
}
}
/**
* Atomically sets the element at position {@code i} to the given
* updated value
* if the current value is <a href="#bitEquals">bitwise equal</a>
* to the expected value.
*
* @param i the index
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int i, double expect, double update) {
return compareAndSetRaw(checkedByteOffset(i),
doubleToRawLongBits(expect),
doubleToRawLongBits(update));
}
private boolean compareAndSetRaw(long offset, long expect, long update) {
return unsafe.compareAndSwapLong(array, offset, expect, update);
}
/**
* Atomically sets the element at position {@code i} to the given
* updated value
* if the current value is <a href="#bitEquals">bitwise equal</a>
* to the expected value.
*
* <p>May <a
* href="http://download.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html#Spurious">
* fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
* appropriate alternative to {@code compareAndSet}.
*
* @param i the index
* @param expect the expected value
* @param update the new value
* @return true if successful
*/
public final boolean weakCompareAndSet(int i, double expect, double update) {
return compareAndSet(i, expect, update);
}
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the previous value
*/
public final double getAndAdd(int i, double delta) {
long offset = checkedByteOffset(i);
while (true) {
long current = getRaw(offset);
double currentVal = longBitsToDouble(current);
double nextVal = currentVal + delta;
long next = doubleToRawLongBits(nextVal);
if (compareAndSetRaw(offset, current, next))
return currentVal;
}
}
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the updated value
*/
public double addAndGet(int i, double delta) {
long offset = checkedByteOffset(i);
while (true) {
long current = getRaw(offset);
double currentVal = longBitsToDouble(current);
double nextVal = currentVal + delta;
long next = doubleToRawLongBits(nextVal);
if (compareAndSetRaw(offset, current, next))
return nextVal;
}
}
/**
* Returns the String representation of the current values of array.
* @return the String representation of the current values of array
*/
public String toString() {
int iMax = array.length - 1;
if (iMax == -1)
return "[]";
// Double.toString(Math.PI).length() == 17
StringBuilder b = new StringBuilder((17 + 2) * (iMax + 1));
b.append('[');
for (int i = 0;; i++) {
b.append(longBitsToDouble(getRaw(byteOffset(i))));
if (i == iMax)
return b.append(']').toString();
b.append(',').append(' ');
}
}
/**
* Saves the state to a stream (that is, serializes it).
*
* @serialData The length of the array is emitted (int), followed by all
* of its elements (each a {@code double}) in the proper order.
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
// Write out array length
int length = length();
s.writeInt(length);
// Write out all elements in the proper order.
for (int i = 0; i < length; i++)
s.writeDouble(get(i));
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
// Read in array length and allocate array
int length = s.readInt();
unsafe.putObjectVolatile(this, arrayOffset, new long[length]);
// Read in all elements in the proper order.
for (int i = 0; i < length; i++)
set(i, s.readDouble());
}
// Unsafe mechanics
private static final sun.misc.Unsafe unsafe = getUnsafe();
private static final long arrayOffset;
private static final int base = unsafe.arrayBaseOffset(long[].class);
private static final int shift;
static {
try {
Class<?> k = AtomicDoubleArray.class;
arrayOffset = unsafe.objectFieldOffset
(k.getDeclaredField("array"));
int scale = unsafe.arrayIndexScale(long[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
/**
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
* Replace with a simple call to Unsafe.getUnsafe when integrating
* into a jdk.
*
* @return a sun.misc.Unsafe
*/
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
(new java.security
.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
java.lang.reflect.Field f = sun.misc
.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (sun.misc.Unsafe) f.get(null);
}});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,9 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
// Built on 2012-06-09
package jsr166e;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,480 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
/**
* A resultless {@link ForkJoinTask} with a completion action
* performed when triggered and there are no remaining pending
* actions. Uses of CountedCompleter are similar to those of other
* completion based components (such as {@link
* java.nio.channels.CompletionHandler}) except that multiple
* <em>pending</em> completions may be necessary to trigger the {@link
* #onCompletion} action, not just one. Unless initialized otherwise,
* the {@link #getPendingCount pending count} starts at zero, but may
* be (atomically) changed using methods {@link #setPendingCount},
* {@link #addToPendingCount}, and {@link
* #compareAndSetPendingCount}. Upon invocation of {@link
* #tryComplete}, if the pending action count is nonzero, it is
* decremented; otherwise, the completion action is performed, and if
* this completer itself has a completer, the process is continued
* with its completer. As is the case with related synchronization
* components such as {@link Phaser} and {@link
* java.util.concurrent.Semaphore} these methods affect only internal
* counts; they do not establish any further internal bookkeeping. In
* particular, the identities of pending tasks are not maintained. As
* illustrated below, you can create subclasses that do record some or
* all pended tasks or their results when needed.
*
* <p>A concrete CountedCompleter class must define method {@link
* #compute}, that should, in almost all use cases, invoke {@code
* tryComplete()} once before returning. The class may also optionally
* override method {@link #onCompletion} to perform an action upon
* normal completion, and method {@link #onExceptionalCompletion} to
* perform an action upon any exception.
*
* <p>A CountedCompleter that does not itself have a completer (i.e.,
* one for which {@link #getCompleter} returns {@code null}) can be
* used as a regular ForkJoinTask with this added functionality.
* However, any completer that in turn has another completer serves
* only as an internal helper for other computations, so its own task
* status (as reported in methods such as {@link ForkJoinTask#isDone})
* is arbitrary; this status changes only upon explicit invocations of
* {@link #complete}, {@link ForkJoinTask#cancel}, {@link
* ForkJoinTask#completeExceptionally} or upon exceptional completion
* of method {@code compute}. Upon any exceptional completion, the
* exception may be relayed to a task's completer (and its completer,
* and so on), if one exists and it has not otherwise already
* completed.
*
* <p><b>Sample Usages.</b>
*
* <p><b>Parallel recursive decomposition.</b> CountedCompleters may
* be arranged in trees similar to those often used with {@link
* RecursiveAction}s, although the constructions involved in setting
* them up typically vary. Even though they entail a bit more
* bookkeeping, CountedCompleters may be better choices when applying
* a possibly time-consuming operation (that cannot be further
* subdivided) to each element of an array or collection; especially
* when the operation takes a significantly different amount of time
* to complete for some elements than others, either because of
* intrinsic variation (for example IO) or auxiliary effects such as
* garbage collection. Because CountedCompleters provide their own
* continuations, other threads need not block waiting to perform
* them.
*
* <p> For example, here is an initial version of a class that uses
* divide-by-two recursive decomposition to divide work into single
* pieces (leaf tasks). Even when work is split into individual calls,
* tree-based techniques are usually preferable to directly forking
* leaf tasks, because they reduce inter-thread communication and
* improve load balancing. In the recursive case, the second of each
* pair of subtasks to finish triggers completion of its parent
* (because no result combination is performed, the default no-op
* implementation of method {@code onCompletion} is not overridden). A
* static utility method sets up the base task and invokes it:
*
* <pre> {@code
* class MyOperation<E> { void apply(E e) { ... } }
*
* class ForEach<E> extends CountedCompleter {
*
* public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
* pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
* }
*
* final E[] array; final MyOperation<E> op; final int lo, hi;
* ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
* super(p);
* this.array = array; this.op = op; this.lo = lo; this.hi = hi;
* }
*
* public void compute() { // version 1
* if (hi - lo >= 2) {
* int mid = (lo + hi) >>> 1;
* setPendingCount(2); // must set pending count before fork
* new ForEach(this, array, op, mid, hi).fork(); // right child
* new ForEach(this, array, op, lo, mid).fork(); // left child
* }
* else if (hi > lo)
* op.apply(array[lo]);
* tryComplete();
* }
* } }</pre>
*
* This design can be improved by noticing that in the recursive case,
* the task has nothing to do after forking its right task, so can
* directly invoke its left task before returning. (This is an analog
* of tail recursion removal.) Also, because the task returns upon
* executing its left task (rather than falling through to invoke
* tryComplete) the pending count is set to one:
*
* <pre> {@code
* class ForEach<E> ...
* public void compute() { // version 2
* if (hi - lo >= 2) {
* int mid = (lo + hi) >>> 1;
* setPendingCount(1); // only one pending
* new ForEach(this, array, op, mid, hi).fork(); // right child
* new ForEach(this, array, op, lo, mid).compute(); // direct invoke
* }
* else {
* if (hi > lo)
* op.apply(array[lo]);
* tryComplete();
* }
* }
* }</pre>
*
* As a further improvement, notice that the left task need not even
* exist. Instead of creating a new one, we can iterate using the
* original task, and add a pending count for each fork:
*
* <pre> {@code
* class ForEach<E> ...
* public void compute() { // version 3
* int l = lo, h = hi;
* while (h - l >= 2) {
* int mid = (l + h) >>> 1;
* addToPendingCount(1);
* new ForEach(this, array, op, mid, h).fork(); // right child
* h = mid;
* }
* if (h > l)
* op.apply(array[l]);
* tryComplete();
* }
* }</pre>
*
* Additional improvements of such classes might entail precomputing
* pending counts so that they can be established in constructors,
* specializing classes for leaf steps, subdividing by say, four,
* instead of two per iteration, and using an adaptive threshold
* instead of always subdividing down to single elements.
*
* <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
* results of multiple subtasks usually need to access these results
* in method {@link #onCompletion}. As illustrated in the following
* class (that performs a simplified form of map-reduce where mappings
* and reductions are all of type {@code E}), one way to do this in
* divide and conquer designs is to have each subtask record its
* sibling, so that it can be accessed in method {@code onCompletion}.
* For clarity, this class uses explicit left and right subtasks, but
* variants of other streamlinings seen in the above example may also
* apply.
*
* <pre> {@code
* class MyMapper<E> { E apply(E v) { ... } }
* class MyReducer<E> { E apply(E x, E y) { ... } }
* class MapReducer<E> extends CountedCompleter {
* final E[] array; final MyMapper<E> mapper;
* final MyReducer<E> reducer; final int lo, hi;
* MapReducer sibling;
* E result;
* MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
* MyReducer<E> reducer, int lo, int hi) {
* super(p);
* this.array = array; this.mapper = mapper;
* this.reducer = reducer; this.lo = lo; this.hi = hi;
* }
* public void compute() {
* if (hi - lo >= 2) {
* int mid = (lo + hi) >>> 1;
* MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
* MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
* left.sibling = right;
* right.sibling = left;
* setPendingCount(1); // only right is pending
* right.fork();
* left.compute(); // directly execute left
* }
* else {
* if (hi > lo)
* result = mapper.apply(array[lo]);
* tryComplete();
* }
* }
* public void onCompletion(CountedCompleter caller) {
* if (caller != this) {
* MapReducer<E> child = (MapReducer<E>)caller;
* MapReducer<E> sib = child.sibling;
* if (sib == null || sib.result == null)
* result = child.result;
* else
* result = reducer.apply(child.result, sib.result);
* }
* }
*
* public static <E> E mapReduce(ForkJoinPool pool, E[] array,
* MyMapper<E> mapper, MyReducer<E> reducer) {
* MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
* reducer, 0, array.length);
* pool.invoke(mr);
* return mr.result;
* }
* } }</pre>
*
* <p><b>Triggers.</b> Some CountedCompleters are themselves never
* forked, but instead serve as bits of plumbing in other designs;
* including those in which the completion of one of more async tasks
* triggers another async task. For example:
*
* <pre> {@code
* class HeaderBuilder extends CountedCompleter { ... }
* class BodyBuilder extends CountedCompleter { ... }
* class PacketSender extends CountedCompleter {
* PacketSender(...) { super(null, 1); ... } // trigger on second completion
* public void compute() { } // never called
* public void onCompletion(CountedCompleter caller) { sendPacket(); }
* }
* // sample use:
* PacketSender p = new PacketSender();
* new HeaderBuilder(p, ...).fork();
* new BodyBuilder(p, ...).fork();
* }</pre>
*
* @since 1.8
* @author Doug Lea
*/
public abstract class CountedCompleter extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453752276485070L;
/** This task's completer, or null if none */
final CountedCompleter completer;
/** The number of pending tasks until completion */
volatile int pending;
/**
* Creates a new CountedCompleter with the given completer
* and initial pending count.
*
* @param completer this tasks completer, or {@code null} if none
* @param initialPendingCount the initial pending count
*/
protected CountedCompleter(CountedCompleter completer,
int initialPendingCount) {
this.completer = completer;
this.pending = initialPendingCount;
}
/**
* Creates a new CountedCompleter with the given completer
* and an initial pending count of zero.
*
* @param completer this tasks completer, or {@code null} if none
*/
protected CountedCompleter(CountedCompleter completer) {
this.completer = completer;
}
/**
* Creates a new CountedCompleter with no completer
* and an initial pending count of zero.
*/
protected CountedCompleter() {
this.completer = null;
}
/**
* The main computation performed by this task.
*/
public abstract void compute();
/**
* Performs an action when method {@link #tryComplete} is invoked
* and there are no pending counts, or when the unconditional
* method {@link #complete} is invoked. By default, this method
* does nothing.
*
* @param caller the task invoking this method (which may
* be this task itself).
*/
public void onCompletion(CountedCompleter caller) {
}
/**
* Performs an action when method {@link #completeExceptionally}
* is invoked or method {@link #compute} throws an exception, and
* this task has not otherwise already completed normally. On
* entry to this method, this task {@link
* ForkJoinTask#isCompletedAbnormally}. The return value of this
* method controls further propagation: If {@code true} and this
* task has a completer, then this completer is also completed
* exceptionally. The default implementation of this method does
* nothing except return {@code true}.
*
* @param ex the exception
* @param caller the task invoking this method (which may
* be this task itself).
* @return true if this exception should be propagated to this
* tasks completer, if one exists.
*/
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
return true;
}
/**
* Returns the completer established in this task's constructor,
* or {@code null} if none.
*
* @return the completer
*/
public final CountedCompleter getCompleter() {
return completer;
}
/**
* Returns the current pending count.
*
* @return the current pending count
*/
public final int getPendingCount() {
return pending;
}
/**
* Sets the pending count to the given value.
*
* @param count the count
*/
public final void setPendingCount(int count) {
pending = count;
}
/**
* Adds (atomically) the given value to the pending count.
*
* @param delta the value to add
*/
public final void addToPendingCount(int delta) {
int c; // note: can replace with intrinsic in jdk8
do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
}
/**
* Sets (atomically) the pending count to the given count only if
* it currently holds the given expected value.
*
* @param expected the expected value
* @param count the new value
* @return true is successful
*/
public final boolean compareAndSetPendingCount(int expected, int count) {
return U.compareAndSwapInt(this, PENDING, expected, count);
}
/**
* If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion} and then similarly
* tries to complete this task's completer, if one exists,
* else marks this task as complete.
*/
public final void tryComplete() {
CountedCompleter a = this, s = a;
for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
return;
}
}
/**
* Regardless of pending count, invokes {@link #onCompletion},
* marks this task as complete with a {@code null} return value,
* and further triggers {@link #tryComplete} on this task's
* completer, if one exists. This method may be useful when
* forcing completion as soon as any one (versus all) of several
* subtask results are obtained.
*
* @param mustBeNull the {@code null} completion value
*/
public void complete(Void mustBeNull) {
CountedCompleter p;
onCompletion(this);
quietlyComplete();
if ((p = completer) != null)
p.tryComplete();
}
/**
* Support for FJT exception propagation
*/
void internalPropagateException(Throwable ex) {
CountedCompleter a = this, s = a;
while (a.onExceptionalCompletion(ex, s) &&
(a = (s = a).completer) != null && a.status >= 0)
a.recordExceptionalCompletion(ex);
}
/**
* Implements execution conventions for CountedCompleters
*/
protected final boolean exec() {
compute();
return false;
}
/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }
/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long PENDING;
static {
try {
U = getUnsafe();
PENDING = U.objectFieldOffset
(CountedCompleter.class.getDeclaredField("pending"));
} catch (Exception e) {
throw new Error(e);
}
}
/**
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
* Replace with a simple call to Unsafe.getUnsafe when integrating
* into a jdk.
*
* @return a sun.misc.Unsafe
*/
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
(new java.security
.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
java.lang.reflect.Field f = sun.misc
.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (sun.misc.Unsafe) f.get(null);
}});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,119 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
* in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*/
final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics
final ForkJoinPool pool; // the pool this thread works in
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super(pool.nextWorkerName());
setDaemon(true);
Thread.UncaughtExceptionHandler ueh = pool.ueh;
if (ueh != null)
setUncaughtExceptionHandler(ueh);
this.pool = pool;
pool.registerWorker(this.workQueue = new ForkJoinPool.WorkQueue
(pool, this, pool.localMode));
}
/**
* Returns the pool hosting this thread.
*
* @return the pool
*/
public ForkJoinPool getPool() {
return pool;
}
/**
* Returns the index number of this thread in its pool. The
* returned value ranges from zero to the maximum number of
* threads (minus one) that have ever been created in the pool.
* This method may be useful for applications that track status or
* collect results per-worker rather than per-task.
*
* @return the index number
*/
public int getPoolIndex() {
return workQueue.poolIndex;
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
}
/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,164 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
/**
* A recursive resultless {@link ForkJoinTask}. This class
* establishes conventions to parameterize resultless actions as
* {@code Void} {@code ForkJoinTask}s. Because {@code null} is the
* only valid value of type {@code Void}, methods such as {@code join}
* always return {@code null} upon completion.
*
* <p><b>Sample Usages.</b> Here is a simple but complete ForkJoin
* sort that sorts a given {@code long[]} array:
*
* <pre> {@code
* static class SortTask extends RecursiveAction {
* final long[] array; final int lo, hi;
* SortTask(long[] array, int lo, int hi) {
* this.array = array; this.lo = lo; this.hi = hi;
* }
* SortTask(long[] array) { this(array, 0, array.length); }
* protected void compute() {
* if (hi - lo < THRESHOLD)
* sortSequentially(lo, hi);
* else {
* int mid = (lo + hi) >>> 1;
* invokeAll(new SortTask(array, lo, mid),
* new SortTask(array, mid, hi));
* merge(lo, mid, hi);
* }
* }
* // implementation details follow:
* final static int THRESHOLD = 1000;
* void sortSequentially(int lo, int hi) {
* Arrays.sort(array, lo, hi);
* }
* void merge(int lo, int mid, int hi) {
* long[] buf = Arrays.copyOfRange(array, lo, mid);
* for (int i = 0, j = lo, k = mid; i < buf.length; j++)
* array[j] = (k == hi || buf[i] < array[k]) ?
* buf[i++] : array[k++];
* }
* }}</pre>
*
* You could then sort {@code anArray} by creating {@code new
* SortTask(anArray)} and invoking it in a ForkJoinPool. As a more
* concrete simple example, the following task increments each element
* of an array:
* <pre> {@code
* class IncrementTask extends RecursiveAction {
* final long[] array; final int lo, hi;
* IncrementTask(long[] array, int lo, int hi) {
* this.array = array; this.lo = lo; this.hi = hi;
* }
* protected void compute() {
* if (hi - lo < THRESHOLD) {
* for (int i = lo; i < hi; ++i)
* array[i]++;
* }
* else {
* int mid = (lo + hi) >>> 1;
* invokeAll(new IncrementTask(array, lo, mid),
* new IncrementTask(array, mid, hi));
* }
* }
* }}</pre>
*
* <p>The following example illustrates some refinements and idioms
* that may lead to better performance: RecursiveActions need not be
* fully recursive, so long as they maintain the basic
* divide-and-conquer approach. Here is a class that sums the squares
* of each element of a double array, by subdividing out only the
* right-hand-sides of repeated divisions by two, and keeping track of
* them with a chain of {@code next} references. It uses a dynamic
* threshold based on method {@code getSurplusQueuedTaskCount}, but
* counterbalances potential excess partitioning by directly
* performing leaf actions on unstolen tasks rather than further
* subdividing.
*
* <pre> {@code
* double sumOfSquares(ForkJoinPool pool, double[] array) {
* int n = array.length;
* Applyer a = new Applyer(array, 0, n, null);
* pool.invoke(a);
* return a.result;
* }
*
* class Applyer extends RecursiveAction {
* final double[] array;
* final int lo, hi;
* double result;
* Applyer next; // keeps track of right-hand-side tasks
* Applyer(double[] array, int lo, int hi, Applyer next) {
* this.array = array; this.lo = lo; this.hi = hi;
* this.next = next;
* }
*
* double atLeaf(int l, int h) {
* double sum = 0;
* for (int i = l; i < h; ++i) // perform leftmost base step
* sum += array[i] * array[i];
* return sum;
* }
*
* protected void compute() {
* int l = lo;
* int h = hi;
* Applyer right = null;
* while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
* int mid = (l + h) >>> 1;
* right = new Applyer(array, mid, h, right);
* right.fork();
* h = mid;
* }
* double sum = atLeaf(l, h);
* while (right != null) {
* if (right.tryUnfork()) // directly calculate if not stolen
* sum += right.atLeaf(right.lo, right.hi);
* else {
* right.join();
* sum += right.result;
* }
* right = right.next;
* }
* result = sum;
* }
* }}</pre>
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* The main computation performed by this task.
*/
protected abstract void compute();
/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }
/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }
/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}
}

View File

@ -0,0 +1,68 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
/**
* A recursive result-bearing {@link ForkJoinTask}.
*
* <p>For a classic example, here is a task computing Fibonacci numbers:
*
* <pre> {@code
* class Fibonacci extends RecursiveTask<Integer> {
* final int n;
* Fibonacci(int n) { this.n = n; }
* Integer compute() {
* if (n <= 1)
* return n;
* Fibonacci f1 = new Fibonacci(n - 1);
* f1.fork();
* Fibonacci f2 = new Fibonacci(n - 2);
* return f2.compute() + f1.join();
* }
* }}</pre>
*
* However, besides being a dumb way to compute Fibonacci functions
* (there is a simple fast linear algorithm that you'd use in
* practice), this is likely to perform poorly because the smallest
* subtasks are too small to be worthwhile splitting up. Instead, as
* is the case for nearly all fork/join applications, you'd pick some
* minimum granularity size (for example 10 here) for which you always
* sequentially solve rather than subdividing.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}

View File

@ -0,0 +1,197 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
import java.util.Random;
/**
* A random number generator isolated to the current thread. Like the
* global {@link java.util.Random} generator used by the {@link
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
* with an internally generated seed that may not otherwise be
* modified. When applicable, use of {@code ThreadLocalRandom} rather
* than shared {@code Random} objects in concurrent programs will
* typically encounter much less overhead and contention. Use of
* {@code ThreadLocalRandom} is particularly appropriate when multiple
* tasks (for example, each a {@link ForkJoinTask}) use random numbers
* in parallel in thread pools.
*
* <p>Usages of this class should typically be of the form:
* {@code ThreadLocalRandom.current().nextX(...)} (where
* {@code X} is {@code Int}, {@code Long}, etc).
* When all usages are of this form, it is never possible to
* accidently share a {@code ThreadLocalRandom} across multiple threads.
*
* <p>This class also provides additional commonly used bounded random
* generation methods.
*
* @since 1.7
* @author Doug Lea
*/
public class ThreadLocalRandom extends Random {
// same constants as Random, but must be redeclared because private
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
/**
* The random seed. We can't use super.seed.
*/
private long rnd;
/**
* Initialization flag to permit calls to setSeed to succeed only
* while executing the Random constructor. We can't allow others
* since it would cause setting seed in one part of a program to
* unintentionally impact other usages by the thread.
*/
boolean initialized;
// Padding to help avoid memory contention among seed updates in
// different TLRs in the common case that they are located near
// each other.
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/**
* The actual ThreadLocal
*/
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
}
};
/**
* Constructor called only by localRandom.initialValue.
*/
ThreadLocalRandom() {
super();
initialized = true;
}
/**
* Returns the current thread's {@code ThreadLocalRandom}.
*
* @return the current thread's {@code ThreadLocalRandom}
*/
public static ThreadLocalRandom current() {
return localRandom.get();
}
/**
* Throws {@code UnsupportedOperationException}. Setting seeds in
* this generator is not supported.
*
* @throws UnsupportedOperationException always
*/
public void setSeed(long seed) {
if (initialized)
throw new UnsupportedOperationException();
rnd = (seed ^ multiplier) & mask;
}
protected int next(int bits) {
rnd = (rnd * multiplier + addend) & mask;
return (int) (rnd >>> (48-bits));
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @throws IllegalArgumentException if least greater than or equal
* to bound
* @return the next value
*/
public int nextInt(int least, int bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextInt(bound - least) + least;
}
/**
* Returns a pseudorandom, uniformly distributed value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public long nextLong(long n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
// Divide n by two until small enough for nextInt. On each
// iteration (at most 31 of them but usually much less),
// randomly choose both whether to include high bit in result
// (offset) and whether to continue with the lower vs upper
// half (which makes a difference only if odd).
long offset = 0;
while (n >= Integer.MAX_VALUE) {
int bits = next(2);
long half = n >>> 1;
long nextn = ((bits & 2) == 0) ? half : n - half;
if ((bits & 1) == 0)
offset += n - nextn;
n = nextn;
}
return offset + nextInt((int) n);
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public long nextLong(long least, long bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextLong(bound - least) + least;
}
/**
* Returns a pseudorandom, uniformly distributed {@code double} value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public double nextDouble(double n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
return nextDouble() * n;
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public double nextDouble(double least, double bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextDouble() * (bound - least) + least;
}
private static final long serialVersionUID = -5851777807851030925L;
}

View File

@ -0,0 +1,133 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
import java.util.concurrent.*;
/**
* A {@link BlockingQueue} in which producers may wait for consumers
* to receive elements. A {@code TransferQueue} may be useful for
* example in message passing applications in which producers
* sometimes (using method {@link #transfer}) await receipt of
* elements by consumers invoking {@code take} or {@code poll}, while
* at other times enqueue elements (via method {@code put}) without
* waiting for receipt.
* {@linkplain #tryTransfer(Object) Non-blocking} and
* {@linkplain #tryTransfer(Object,long,TimeUnit) time-out} versions of
* {@code tryTransfer} are also available.
* A {@code TransferQueue} may also be queried, via {@link
* #hasWaitingConsumer}, whether there are any threads waiting for
* items, which is a converse analogy to a {@code peek} operation.
*
* <p>Like other blocking queues, a {@code TransferQueue} may be
* capacity bounded. If so, an attempted transfer operation may
* initially block waiting for available space, and/or subsequently
* block waiting for reception by a consumer. Note that in a queue
* with zero capacity, such as {@link SynchronousQueue}, {@code put}
* and {@code transfer} are effectively synonymous.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.7
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
public interface TransferQueue<E> extends BlockingQueue<E> {
/**
* Transfers the element to a waiting consumer immediately, if possible.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* otherwise returning {@code false} without enqueuing the element.
*
* @param e the element to transfer
* @return {@code true} if the element was transferred, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean tryTransfer(E e);
/**
* Transfers the element to a consumer, waiting if necessary to do so.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else waits until the element is received by a consumer.
*
* @param e the element to transfer
* @throws InterruptedException if interrupted while waiting,
* in which case the element is not left enqueued
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void transfer(E e) throws InterruptedException;
/**
* Transfers the element to a consumer if it is possible to do so
* before the timeout elapses.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else waits until the element is received by a consumer,
* returning {@code false} if the specified wait time elapses
* before the element can be transferred.
*
* @param e the element to transfer
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before completion,
* in which case the element is not left enqueued
* @throws InterruptedException if interrupted while waiting,
* in which case the element is not left enqueued
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Returns {@code true} if there is at least one consumer waiting
* to receive an element via {@link #take} or
* timed {@link #poll(long,TimeUnit) poll}.
* The return value represents a momentary state of affairs.
*
* @return {@code true} if there is at least one waiting consumer
*/
boolean hasWaitingConsumer();
/**
* Returns an estimate of the number of consumers waiting to
* receive elements via {@link #take} or timed
* {@link #poll(long,TimeUnit) poll}. The return value is an
* approximation of a momentary state of affairs, that may be
* inaccurate if consumers have completed or given up waiting.
* The value may be useful for monitoring and heuristics, but
* not for synchronization control. Implementations of this
* method are likely to be noticeably slower than those for
* {@link #hasWaitingConsumer}.
*
* @return the number of consumers waiting to receive elements
*/
int getWaitingConsumerCount();
}

View File

@ -0,0 +1,30 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
/**
* Preview versions of classes targeted for Java 7. Includes a
* fine-grained parallel computation framework: ForkJoinTasks and
* their related support classes provide a very efficient basis for
* obtaining platform-independent parallel speed-ups of
* computation-intensive operations. They are not a full substitute
* for the kinds of arbitrary processing supported by Executors or
* Threads. However, when applicable, they typically provide
* significantly greater performance on multiprocessor platforms.
*
* <p>Candidates for fork/join processing mainly include those that
* can be expressed using parallel divide-and-conquer techniques: To
* solve a problem, break it in two (or more) parts, and then solve
* those parts in parallel, continuing on in this way until the
* problem is too small to be broken up, so is solved directly. The
* underlying <em>work-stealing</em> framework makes subtasks
* available to other threads (normally one per CPU), that help
* complete the tasks. In general, the most efficient ForkJoinTasks
* are those that directly implement this algorithmic design pattern.
*/
// Built on 2012-06-09
package jsr166y;