update to latest jsr166

This commit is contained in:
Shay Banon 2013-07-10 13:09:04 -07:00
parent abf2268574
commit fe6fb7135b
11 changed files with 5838 additions and 5923 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -8,18 +8,19 @@ package jsr166e;
/** /**
* A {@link ForkJoinTask} with a completion action performed when * A {@link ForkJoinTask} with a completion action performed when
* triggered and there are no remaining pending * triggered and there are no remaining pending actions.
* actions. CountedCompleters are in general more robust in the * CountedCompleters are in general more robust in the
* presence of subtask stalls and blockage than are other forms of * presence of subtask stalls and blockage than are other forms of
* ForkJoinTasks, but are less intuitive to program. Uses of * ForkJoinTasks, but are less intuitive to program. Uses of
* CountedCompleter are similar to those of other completion based * CountedCompleter are similar to those of other completion based
* components (such as {@link java.nio.channels.CompletionHandler}) * components (such as {@link java.nio.channels.CompletionHandler})
* except that multiple <em>pending</em> completions may be necessary * except that multiple <em>pending</em> completions may be necessary
* to trigger the {@link #onCompletion} action, not just one. Unless * to trigger the completion action {@link #onCompletion(CountedCompleter)},
* initialized otherwise, the {@link #getPendingCount pending count} * not just one.
* starts at zero, but may be (atomically) changed using methods * Unless initialized otherwise, the {@linkplain #getPendingCount pending
* {@link #setPendingCount}, {@link #addToPendingCount}, and {@link * count} starts at zero, but may be (atomically) changed using
* #compareAndSetPendingCount}. Upon invocation of {@link * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
* {@link #compareAndSetPendingCount}. Upon invocation of {@link
* #tryComplete}, if the pending action count is nonzero, it is * #tryComplete}, if the pending action count is nonzero, it is
* decremented; otherwise, the completion action is performed, and if * decremented; otherwise, the completion action is performed, and if
* this completer itself has a completer, the process is continued * this completer itself has a completer, the process is continued
@ -40,9 +41,10 @@ package jsr166e;
* <p>A concrete CountedCompleter class must define method {@link * <p>A concrete CountedCompleter class must define method {@link
* #compute}, that should in most cases (as illustrated below), invoke * #compute}, that should in most cases (as illustrated below), invoke
* {@code tryComplete()} once before returning. The class may also * {@code tryComplete()} once before returning. The class may also
* optionally override method {@link #onCompletion} to perform an * optionally override method {@link #onCompletion(CountedCompleter)}
* action upon normal completion, and method {@link * to perform an action upon normal completion, and method
* #onExceptionalCompletion} to perform an action upon any exception. * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
* perform an action upon any exception.
* *
* <p>CountedCompleters most often do not bear results, in which case * <p>CountedCompleters most often do not bear results, in which case
* they are normally declared as {@code CountedCompleter<Void>}, and * they are normally declared as {@code CountedCompleter<Void>}, and
@ -63,13 +65,14 @@ package jsr166e;
* only as an internal helper for other computations, so its own task * only as an internal helper for other computations, so its own task
* status (as reported in methods such as {@link ForkJoinTask#isDone}) * status (as reported in methods such as {@link ForkJoinTask#isDone})
* is arbitrary; this status changes only upon explicit invocations of * is arbitrary; this status changes only upon explicit invocations of
* {@link #complete}, {@link ForkJoinTask#cancel}, {@link * {@link #complete}, {@link ForkJoinTask#cancel},
* ForkJoinTask#completeExceptionally} or upon exceptional completion * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
* of method {@code compute}. Upon any exceptional completion, the * exceptional completion of method {@code compute}. Upon any
* exception may be relayed to a task's completer (and its completer, * exceptional completion, the exception may be relayed to a task's
* and so on), if one exists and it has not otherwise already * completer (and its completer, and so on), if one exists and it has
* completed. Similarly, cancelling an internal CountedCompleter has * not otherwise already completed. Similarly, cancelling an internal
* only a local effect on that completer, so is not often useful. * CountedCompleter has only a local effect on that completer, so is
* not often useful.
* *
* <p><b>Sample Usages.</b> * <p><b>Sample Usages.</b>
* *
@ -96,8 +99,8 @@ package jsr166e;
* improve load balancing. In the recursive case, the second of each * improve load balancing. In the recursive case, the second of each
* pair of subtasks to finish triggers completion of its parent * pair of subtasks to finish triggers completion of its parent
* (because no result combination is performed, the default no-op * (because no result combination is performed, the default no-op
* implementation of method {@code onCompletion} is not overridden). A * implementation of method {@code onCompletion} is not overridden).
* static utility method sets up the base task and invokes it * A static utility method sets up the base task and invokes it
* (here, implicitly using the {@link ForkJoinPool#commonPool()}). * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
* *
* <pre> {@code * <pre> {@code
@ -152,12 +155,11 @@ package jsr166e;
* } * }
* }</pre> * }</pre>
* *
* As a further improvement, notice that the left task need not even * As a further improvement, notice that the left task need not even exist.
* exist. Instead of creating a new one, we can iterate using the * Instead of creating a new one, we can iterate using the original task,
* original task, and add a pending count for each fork. Additionally, * and add a pending count for each fork. Additionally, because no task
* because no task in this tree implements an {@link #onCompletion} * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
* method, {@code tryComplete()} can be replaced with {@link * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
* #propagateCompletion}.
* *
* <pre> {@code * <pre> {@code
* class ForEach<E> ... * class ForEach<E> ...
@ -235,7 +237,7 @@ package jsr166e;
* *
* <p><b>Recording subtasks.</b> CountedCompleter tasks that combine * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
* results of multiple subtasks usually need to access these results * results of multiple subtasks usually need to access these results
* in method {@link #onCompletion}. As illustrated in the following * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
* class (that performs a simplified form of map-reduce where mappings * class (that performs a simplified form of map-reduce where mappings
* and reductions are all of type {@code E}), one way to do this in * and reductions are all of type {@code E}), one way to do this in
* divide and conquer designs is to have each subtask record its * divide and conquer designs is to have each subtask record its
@ -336,7 +338,7 @@ package jsr166e;
* while (h - l >= 2) { * while (h - l >= 2) {
* int mid = (l + h) >>> 1; * int mid = (l + h) >>> 1;
* addToPendingCount(1); * addToPendingCount(1);
* (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork; * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
* h = mid; * h = mid;
* } * }
* if (h > l) * if (h > l)
@ -357,7 +359,7 @@ package jsr166e;
* *
* <p><b>Triggers.</b> Some CountedCompleters are themselves never * <p><b>Triggers.</b> Some CountedCompleters are themselves never
* forked, but instead serve as bits of plumbing in other designs; * forked, but instead serve as bits of plumbing in other designs;
* including those in which the completion of one of more async tasks * including those in which the completion of one or more async tasks
* triggers another async task. For example: * triggers another async task. For example:
* *
* <pre> {@code * <pre> {@code
@ -437,20 +439,21 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
} }
/** /**
* Performs an action when method {@link #completeExceptionally} * Performs an action when method {@link
* is invoked or method {@link #compute} throws an exception, and * #completeExceptionally(Throwable)} is invoked or method {@link
* this task has not otherwise already completed normally. On * #compute} throws an exception, and this task has not already
* entry to this method, this task {@link * otherwise completed normally. On entry to this method, this task
* ForkJoinTask#isCompletedAbnormally}. The return value of this * {@link ForkJoinTask#isCompletedAbnormally}. The return value
* method controls further propagation: If {@code true} and this * of this method controls further propagation: If {@code true}
* task has a completer, then this completer is also completed * and this task has a completer that has not completed, then that
* exceptionally. The default implementation of this method does * completer is also completed exceptionally, with the same
* nothing except return {@code true}. * exception as this completer. The default implementation of
* this method does nothing except return {@code true}.
* *
* @param ex the exception * @param ex the exception
* @param caller the task invoking this method (which may * @param caller the task invoking this method (which may
* be this task itself) * be this task itself)
* @return true if this exception should be propagated to this * @return {@code true} if this exception should be propagated to this
* task's completer, if one exists * task's completer, if one exists
*/ */
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) { public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
@ -491,7 +494,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* @param delta the value to add * @param delta the value to add
*/ */
public final void addToPendingCount(int delta) { public final void addToPendingCount(int delta) {
int c; // note: can replace with intrinsic in jdk8 int c;
do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta)); do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
} }
@ -501,7 +504,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* *
* @param expected the expected value * @param expected the expected value
* @param count the new value * @param count the new value
* @return true if successful * @return {@code true} if successful
*/ */
public final boolean compareAndSetPendingCount(int expected, int count) { public final boolean compareAndSetPendingCount(int expected, int count) {
return U.compareAndSwapInt(this, PENDING, expected, count); return U.compareAndSwapInt(this, PENDING, expected, count);
@ -535,9 +538,9 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/** /**
* If the pending count is nonzero, decrements the count; * If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion} and then similarly * otherwise invokes {@link #onCompletion(CountedCompleter)}
* tries to complete this task's completer, if one exists, * and then similarly tries to complete this task's completer,
* else marks this task as complete. * if one exists, else marks this task as complete.
*/ */
public final void tryComplete() { public final void tryComplete() {
CountedCompleter<?> a = this, s = a; CountedCompleter<?> a = this, s = a;
@ -556,12 +559,12 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/** /**
* Equivalent to {@link #tryComplete} but does not invoke {@link * Equivalent to {@link #tryComplete} but does not invoke {@link
* #onCompletion} along the completion path: If the pending count * #onCompletion(CountedCompleter)} along the completion path:
* is nonzero, decrements the count; otherwise, similarly tries to * If the pending count is nonzero, decrements the count;
* complete this task's completer, if one exists, else marks this * otherwise, similarly tries to complete this task's completer, if
* task as complete. This method may be useful in cases where * one exists, else marks this task as complete. This method may be
* {@code onCompletion} should not, or need not, be invoked for * useful in cases where {@code onCompletion} should not, or need
* each completer in a computation. * not, be invoked for each completer in a computation.
*/ */
public final void propagateCompletion() { public final void propagateCompletion() {
CountedCompleter<?> a = this, s = a; CountedCompleter<?> a = this, s = a;
@ -578,13 +581,15 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
} }
/** /**
* Regardless of pending count, invokes {@link #onCompletion}, * Regardless of pending count, invokes
* marks this task as complete and further triggers {@link * {@link #onCompletion(CountedCompleter)}, marks this task as
* #tryComplete} on this task's completer, if one exists. The * complete and further triggers {@link #tryComplete} on this
* given rawResult is used as an argument to {@link #setRawResult} * task's completer, if one exists. The given rawResult is
* before invoking {@link #onCompletion} or marking this task as * used as an argument to {@link #setRawResult} before invoking
* complete; its value is meaningful only for classes overriding * {@link #onCompletion(CountedCompleter)} or marking this task
* {@code setRawResult}. * as complete; its value is meaningful only for classes
* overriding {@code setRawResult}. This method does not modify
* the pending count.
* *
* <p>This method may be useful when forcing completion as soon as * <p>This method may be useful when forcing completion as soon as
* any one (versus all) of several subtask results are obtained. * any one (versus all) of several subtask results are obtained.
@ -624,8 +629,8 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/** /**
* If this task does not have a completer, invokes {@link * If this task does not have a completer, invokes {@link
* ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if
* this task's pending count is non-zero, decrements its pending * the completer's pending count is non-zero, decrements that
* count and returns {@code null}. Otherwise, returns the * pending count and returns {@code null}. Otherwise, returns the
* completer. This method can be used as part of a completion * completer. This method can be used as part of a completion
* traversal loop for homogeneous task hierarchies: * traversal loop for homogeneous task hierarchies:
* *
@ -667,8 +672,9 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
void internalPropagateException(Throwable ex) { void internalPropagateException(Throwable ex) {
CountedCompleter<?> a = this, s = a; CountedCompleter<?> a = this, s = a;
while (a.onExceptionalCompletion(ex, s) && while (a.onExceptionalCompletion(ex, s) &&
(a = (s = a).completer) != null && a.status >= 0) (a = (s = a).completer) != null && a.status >= 0 &&
a.recordExceptionalCompletion(ex); a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
;
} }
/** /**

File diff suppressed because it is too large Load Diff

View File

@ -134,10 +134,9 @@ import java.lang.reflect.Constructor;
* (DAG). Otherwise, executions may encounter a form of deadlock as * (DAG). Otherwise, executions may encounter a form of deadlock as
* tasks cyclically wait for each other. However, this framework * tasks cyclically wait for each other. However, this framework
* supports other methods and techniques (for example the use of * supports other methods and techniques (for example the use of
* {@link java.util.concurrent.Phaser}, {@link #helpQuiesce}, and * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* {@link #complete}) that
* may be of use in constructing custom subclasses for problems that * may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a * are not statically structured as DAGs. To support such usages, a
* ForkJoinTask may be atomically <em>tagged</em> with a {@code short} * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
* value using {@link #setForkJoinTaskTag} or {@link * value using {@link #setForkJoinTaskTag} or {@link
* #compareAndSetForkJoinTaskTag} and checked using {@link * #compareAndSetForkJoinTaskTag} and checked using {@link
@ -286,9 +285,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
private int externalAwaitDone() { private int externalAwaitDone() {
int s; int s;
ForkJoinPool.externalHelpJoin(this); ForkJoinPool cp = ForkJoinPool.common;
if ((s = status) >= 0) {
if (cp != null) {
if (this instanceof CountedCompleter)
s = cp.externalHelpComplete((CountedCompleter<?>)this);
else if (cp.tryExternalUnpush(this))
s = doExec();
}
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false; boolean interrupted = false;
while ((s = status) >= 0) { do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) { synchronized (this) {
if (status >= 0) { if (status >= 0) {
@ -302,9 +309,11 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
notifyAll(); notifyAll();
} }
} }
} } while ((s = status) >= 0);
if (interrupted) if (interrupted)
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
}
}
return s; return s;
} }
@ -313,9 +322,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
private int externalInterruptibleAwaitDone() throws InterruptedException { private int externalInterruptibleAwaitDone() throws InterruptedException {
int s; int s;
ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted()) if (Thread.interrupted())
throw new InterruptedException(); throw new InterruptedException();
ForkJoinPool.externalHelpJoin(this); if ((s = status) >= 0 && cp != null) {
if (this instanceof CountedCompleter)
cp.externalHelpComplete((CountedCompleter<?>)this);
else if (cp.tryExternalUnpush(this))
doExec();
}
while ((s = status) >= 0) { while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) { synchronized (this) {
@ -601,15 +616,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** /**
* A version of "sneaky throw" to relay exceptions * A version of "sneaky throw" to relay exceptions
*/ */
static void rethrow(final Throwable ex) { static void rethrow(Throwable ex) {
if (ex != null) { if (ex != null)
if (ex instanceof Error)
throw (Error)ex;
if (ex instanceof RuntimeException)
throw (RuntimeException)ex;
ForkJoinTask.<RuntimeException>uncheckedThrow(ex); ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
} }
}
/** /**
* The sneaky part of sneaky throw, relying on generics * The sneaky part of sneaky throw, relying on generics
@ -618,7 +628,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
@SuppressWarnings("unchecked") static <T extends Throwable> @SuppressWarnings("unchecked") static <T extends Throwable>
void uncheckedThrow(Throwable t) throws T { void uncheckedThrow(Throwable t) throws T {
if (t != null)
throw (T)t; // rely on vacuous cast throw (T)t; // rely on vacuous cast
} }
@ -830,7 +839,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* <p>This method is designed to be invoked by <em>other</em> * <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or * tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or * throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}. * invoke {@link #completeExceptionally(Throwable)}.
* *
* @param mayInterruptIfRunning this value has no effect in the * @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to * default implementation because interrupts are not used to
@ -982,6 +991,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// Messy in part because we measure in nanosecs, but wait in millisecs // Messy in part because we measure in nanosecs, but wait in millisecs
int s; long ms; int s; long ms;
long ns = unit.toNanos(timeout); long ns = unit.toNanos(timeout);
ForkJoinPool cp;
if ((s = status) >= 0 && ns > 0L) { if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns; long deadline = System.nanoTime() + ns;
ForkJoinPool p = null; ForkJoinPool p = null;
@ -993,8 +1003,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
w = wt.workQueue; w = wt.workQueue;
p.helpJoinOnce(w, this); // no retries on failure p.helpJoinOnce(w, this); // no retries on failure
} }
else else if ((cp = ForkJoinPool.common) != null) {
ForkJoinPool.externalHelpJoin(this); if (this instanceof CountedCompleter)
cp.externalHelpComplete((CountedCompleter<?>)this);
else if (cp.tryExternalUnpush(this))
doExec();
}
boolean canBlock = false; boolean canBlock = false;
boolean interrupted = false; boolean interrupted = false;
try { try {
@ -1002,7 +1016,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (w != null && w.qlock < 0) if (w != null && w.qlock < 0)
cancelIgnoringExceptions(this); cancelIgnoringExceptions(this);
else if (!canBlock) { else if (!canBlock) {
if (p == null || p.tryCompensate()) if (p == null || p.tryCompensate(p.ctl))
canBlock = true; canBlock = true;
} }
else { else {
@ -1143,7 +1157,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Thread t; Thread t;
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
ForkJoinPool.tryExternalUnpush(this)); ForkJoinPool.common.tryExternalUnpush(this));
} }
/** /**
@ -1312,7 +1326,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* *
* @param e the expected tag value * @param e the expected tag value
* @param tag the new tag value * @param tag the new tag value
* @return true if successful; i.e., the current value was * @return {@code true} if successful; i.e., the current value was
* equal to e and is now tag. * equal to e and is now tag.
* @since 1.8 * @since 1.8
*/ */
@ -1364,6 +1378,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private static final long serialVersionUID = 5232453952276885070L; private static final long serialVersionUID = 5232453952276885070L;
} }
/**
* Adaptor for Runnables in which failure forces worker exception
*/
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
void internalPropagateException(Throwable ex) {
rethrow(ex); // rethrow outside exec() catches.
}
private static final long serialVersionUID = 5232453952276885070L;
}
/** /**
* Adaptor for Callables * Adaptor for Callables
*/ */

View File

@ -6,6 +6,7 @@
package jsr166e; package jsr166e;
/** /**
* A thread managed by a {@link ForkJoinPool}, which executes * A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s. * {@link ForkJoinTask}s.
@ -14,8 +15,8 @@ package jsr166e;
* scheduling or execution. However, you can override initialization * scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop. * and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a * If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
* in a {@code ForkJoinPool}. * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
* *
* @since 1.7 * @since 1.7
* @author Doug Lea * @author Doug Lea
@ -60,16 +61,17 @@ public class ForkJoinWorkerThread extends Thread {
} }
/** /**
* Returns the index number of this thread in its pool. The * Returns the unique index number of this thread in its pool.
* returned value ranges from zero to the maximum number of * The returned value ranges from zero to the maximum number of
* threads (minus one) that have ever been created in the pool. * threads (minus one) that may exist in the pool, and does not
* This method may be useful for applications that track status or * change during the lifetime of the thread. This method may be
* collect results per-worker rather than per-task. * useful for applications that track status or collect results
* per-worker-thread rather than per-task.
* *
* @return the index number * @return the index number
*/ */
public int getPoolIndex() { public int getPoolIndex() {
return workQueue.poolIndex; return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
} }
/** /**

View File

@ -5,10 +5,8 @@
*/ */
package jsr166e; package jsr166e;
import jsr166y.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
@ -120,33 +118,16 @@ import java.util.concurrent.locks.LockSupport;
* } * }
* } * }
* *
* double distanceFromOriginV1() { // A read-only method * double distanceFromOrigin() { // A read-only method
* long stamp; * long stamp = sl.tryOptimisticRead();
* if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic * double currentX = x, currentY = y;
* double currentX = x; * if (!sl.validate(stamp)) {
* double currentY = y; * stamp = sl.readLock();
* if (sl.validate(stamp))
* return Math.sqrt(currentX * currentX + currentY * currentY);
* }
* stamp = sl.readLock(); // fall back to read lock
* try {
* double currentX = x;
* double currentY = y;
* return Math.sqrt(currentX * currentX + currentY * currentY);
* } finally {
* sl.unlockRead(stamp);
* }
* }
*
* double distanceFromOriginV2() { // combines code paths
* double currentX = 0.0, currentY = 0.0;
* for (long stamp = sl.tryOptimisticRead(); ; stamp = sl.readLock()) {
* try { * try {
* currentX = x; * currentX = x;
* currentY = y; * currentY = y;
* } finally { * } finally {
* if (sl.tryConvertToOptimisticRead(stamp) != 0L) // unlock or validate * sl.unlockRead(stamp);
* break;
* } * }
* } * }
* return Math.sqrt(currentX * currentX + currentY * currentY); * return Math.sqrt(currentX * currentX + currentY * currentY);
@ -359,6 +340,8 @@ public class StampedLock implements java.io.Serializable {
* Behavior under timeout and interruption matches that specified * Behavior under timeout and interruption matches that specified
* for method {@link Lock#tryLock(long,TimeUnit)}. * for method {@link Lock#tryLock(long,TimeUnit)}.
* *
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return a stamp that can be used to unlock or convert mode, * @return a stamp that can be used to unlock or convert mode,
* or zero if the lock is not available * or zero if the lock is not available
* @throws InterruptedException if the current thread is interrupted * @throws InterruptedException if the current thread is interrupted
@ -438,6 +421,8 @@ public class StampedLock implements java.io.Serializable {
* Behavior under timeout and interruption matches that specified * Behavior under timeout and interruption matches that specified
* for method {@link Lock#tryLock(long,TimeUnit)}. * for method {@link Lock#tryLock(long,TimeUnit)}.
* *
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return a stamp that can be used to unlock or convert mode, * @return a stamp that can be used to unlock or convert mode,
* or zero if the lock is not available * or zero if the lock is not available
* @throws InterruptedException if the current thread is interrupted * @throws InterruptedException if the current thread is interrupted
@ -503,7 +488,8 @@ public class StampedLock implements java.io.Serializable {
* obtained from {@link #tryOptimisticRead} or a locking method * obtained from {@link #tryOptimisticRead} or a locking method
* for this lock has no defined effect or result. * for this lock has no defined effect or result.
* *
* @return true if the lock has not been exclusively acquired * @param stamp a stamp
* @return {@code true} if the lock has not been exclusively acquired
* since issuance of the given stamp; else false * since issuance of the given stamp; else false
*/ */
public boolean validate(long stamp) { public boolean validate(long stamp) {
@ -716,7 +702,7 @@ public class StampedLock implements java.io.Serializable {
* stamp value. This method may be useful for recovery after * stamp value. This method may be useful for recovery after
* errors. * errors.
* *
* @return true if the lock was held, else false * @return {@code true} if the lock was held, else false
*/ */
public boolean tryUnlockWrite() { public boolean tryUnlockWrite() {
long s; WNode h; long s; WNode h;
@ -734,7 +720,7 @@ public class StampedLock implements java.io.Serializable {
* requiring a stamp value. This method may be useful for recovery * requiring a stamp value. This method may be useful for recovery
* after errors. * after errors.
* *
* @return true if the read lock was held, else false * @return {@code true} if the read lock was held, else false
*/ */
public boolean tryUnlockRead() { public boolean tryUnlockRead() {
long s, m; WNode h; long s, m; WNode h;
@ -752,30 +738,66 @@ public class StampedLock implements java.io.Serializable {
return false; return false;
} }
// status monitoring methods
/** /**
* Returns true if the lock is currently held exclusively. * Returns combined state-held and overflow read count for given
* state s.
*/
private int getReadLockCount(long s) {
long readers;
if ((readers = s & RBITS) >= RFULL)
readers = RFULL + readerOverflow;
return (int) readers;
}
/**
* Returns {@code true} if the lock is currently held exclusively.
* *
* @return true if the lock is currently held exclusively * @return {@code true} if the lock is currently held exclusively
*/ */
public boolean isWriteLocked() { public boolean isWriteLocked() {
return (state & WBIT) != 0L; return (state & WBIT) != 0L;
} }
/** /**
* Returns true if the lock is currently held non-exclusively. * Returns {@code true} if the lock is currently held non-exclusively.
* *
* @return true if the lock is currently held non-exclusively * @return {@code true} if the lock is currently held non-exclusively
*/ */
public boolean isReadLocked() { public boolean isReadLocked() {
return (state & RBITS) != 0L; return (state & RBITS) != 0L;
} }
private void readObject(java.io.ObjectInputStream s) /**
throws java.io.IOException, ClassNotFoundException { * Queries the number of read locks held for this lock. This
s.defaultReadObject(); * method is designed for use in monitoring system state, not for
state = ORIGIN; // reset to unlocked state * synchronization control.
* @return the number of read locks held
*/
public int getReadLockCount() {
return getReadLockCount(state);
} }
/**
* Returns a string identifying this lock, as well as its lock
* state. The state, in brackets, includes the String {@code
* "Unlocked"} or the String {@code "Write-locked"} or the String
* {@code "Read-locks:"} followed by the current number of
* read-locks held.
*
* @return a string identifying this lock, as well as its lock state
*/
public String toString() {
long s = state;
return super.toString() +
((s & ABITS) == 0L ? "[Unlocked]" :
(s & WBIT) != 0L ? "[Write-locked]" :
"[Read-locks:" + getReadLockCount(s) + "]");
}
// views
/** /**
* Returns a plain {@link Lock} view of this StampedLock in which * Returns a plain {@link Lock} view of this StampedLock in which
* the {@link Lock#lock} method is mapped to {@link #readLock}, * the {@link Lock#lock} method is mapped to {@link #readLock},
@ -890,6 +912,12 @@ public class StampedLock implements java.io.Serializable {
} }
} }
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
state = ORIGIN; // reset to unlocked state
}
// internals // internals
/** /**
@ -901,7 +929,7 @@ public class StampedLock implements java.io.Serializable {
* @return new stamp on success, else zero * @return new stamp on success, else zero
*/ */
private long tryIncReaderOverflow(long s) { private long tryIncReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL // assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) { if ((s & ABITS) == RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
++readerOverflow; ++readerOverflow;
@ -922,7 +950,7 @@ public class StampedLock implements java.io.Serializable {
* @return new stamp on success, else zero * @return new stamp on success, else zero
*/ */
private long tryDecReaderOverflow(long s) { private long tryDecReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL // assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) { if ((s & ABITS) == RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
int r; long next; int r; long next;
@ -1044,11 +1072,14 @@ public class StampedLock implements java.io.Serializable {
time = 0L; time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false); return cancelWaiter(node, node, false);
node.thread = Thread.currentThread(); Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport.park
node.thread = wt;
if (node.prev == p && p.status == WAITING && // recheck if (node.prev == p && p.status == WAITING && // recheck
(p != whead || (state & ABITS) != 0L)) (p != whead || (state & ABITS) != 0L))
U.park(false, time); U.park(false, time);
node.thread = null; node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted()) if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true); return cancelWaiter(node, node, true);
} }
@ -1113,8 +1144,6 @@ public class StampedLock implements java.io.Serializable {
node.cowait = p.cowait, node)) { node.cowait = p.cowait, node)) {
node.thread = Thread.currentThread(); node.thread = Thread.currentThread();
for (long time;;) { for (long time;;) {
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
if (deadline == 0L) if (deadline == 0L)
time = 0L; time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) else if ((time = deadline - System.nanoTime()) <= 0L)
@ -1126,9 +1155,14 @@ public class StampedLock implements java.io.Serializable {
node.thread = null; node.thread = null;
break; break;
} }
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
if (node.thread == null) // must recheck if (node.thread == null) // must recheck
break; break;
U.park(false, time); U.park(false, time);
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
} }
group = p; group = p;
} }
@ -1184,11 +1218,14 @@ public class StampedLock implements java.io.Serializable {
time = 0L; time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false); return cancelWaiter(node, node, false);
node.thread = Thread.currentThread(); Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (node.prev == p && p.status == WAITING && if (node.prev == p && p.status == WAITING &&
(p != whead || (state & ABITS) != WBIT)) (p != whead || (state & ABITS) != WBIT))
U.park(false, time); U.park(false, time);
node.thread = null; node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted()) if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true); return cancelWaiter(node, node, true);
} }
@ -1289,6 +1326,7 @@ public class StampedLock implements java.io.Serializable {
private static final long WNEXT; private static final long WNEXT;
private static final long WSTATUS; private static final long WSTATUS;
private static final long WCOWAIT; private static final long WCOWAIT;
private static final long PARKBLOCKER;
static { static {
try { try {
@ -1307,6 +1345,9 @@ public class StampedLock implements java.io.Serializable {
(wk.getDeclaredField("next")); (wk.getDeclaredField("next"));
WCOWAIT = U.objectFieldOffset WCOWAIT = U.objectFieldOffset
(wk.getDeclaredField("cowait")); (wk.getDeclaredField("cowait"));
Class<?> tk = Thread.class;
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
} catch (Exception e) { } catch (Exception e) {
throw new Error(e); throw new Error(e);

View File

@ -19,7 +19,7 @@ import static java.lang.Double.longBitsToDouble;
* this class does extend {@code Number} to allow uniform access by * this class does extend {@code Number} to allow uniform access by
* tools and utilities that deal with numerically-based classes. * tools and utilities that deal with numerically-based classes.
* *
* <p><a name="bitEquals">This class compares primitive {@code double} * <p id="bitEquals">This class compares primitive {@code double}
* values in methods such as {@link #compareAndSet} by comparing their * values in methods such as {@link #compareAndSet} by comparing their
* bitwise representation using {@link Double#doubleToRawLongBits}, * bitwise representation using {@link Double#doubleToRawLongBits},
* which differs from both the primitive double {@code ==} operator * which differs from both the primitive double {@code ==} operator
@ -29,7 +29,7 @@ import static java.lang.Double.longBitsToDouble;
* long xBits = Double.doubleToRawLongBits(x); * long xBits = Double.doubleToRawLongBits(x);
* long yBits = Double.doubleToRawLongBits(y); * long yBits = Double.doubleToRawLongBits(y);
* return xBits == yBits; * return xBits == yBits;
* }}</pre></a> * }}</pre>
* *
* @see jsr166e.DoubleAdder * @see jsr166e.DoubleAdder
* @see jsr166e.DoubleMaxUpdater * @see jsr166e.DoubleMaxUpdater
@ -123,11 +123,10 @@ public class AtomicDouble extends Number implements java.io.Serializable {
* if the current value is <a href="#bitEquals">bitwise equal</a> * if the current value is <a href="#bitEquals">bitwise equal</a>
* to the expected value. * to the expected value.
* *
* <p>May <a * <p><a
* href="http://download.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html#Spurious"> * href="http://download.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html#Spurious">
* fail spuriously</a> * May fail spuriously and does not provide ordering guarantees</a>,
* and does not provide ordering guarantees, so is only rarely an * so is only rarely an appropriate alternative to {@code compareAndSet}.
* appropriate alternative to {@code compareAndSet}.
* *
* @param expect the expected value * @param expect the expected value
* @param update the new value * @param update the new value

View File

@ -14,7 +14,7 @@ import static java.lang.Double.longBitsToDouble;
* See the {@link java.util.concurrent.atomic} package specification * See the {@link java.util.concurrent.atomic} package specification
* for description of the properties of atomic variables. * for description of the properties of atomic variables.
* *
* <p><a name="bitEquals">This class compares primitive {@code double} * <p id="bitEquals">This class compares primitive {@code double}
* values in methods such as {@link #compareAndSet} by comparing their * values in methods such as {@link #compareAndSet} by comparing their
* bitwise representation using {@link Double#doubleToRawLongBits}, * bitwise representation using {@link Double#doubleToRawLongBits},
* which differs from both the primitive double {@code ==} operator * which differs from both the primitive double {@code ==} operator
@ -24,7 +24,7 @@ import static java.lang.Double.longBitsToDouble;
* long xBits = Double.doubleToRawLongBits(x); * long xBits = Double.doubleToRawLongBits(x);
* long yBits = Double.doubleToRawLongBits(y); * long yBits = Double.doubleToRawLongBits(y);
* return xBits == yBits; * return xBits == yBits;
* }}</pre></a> * }}</pre>
* *
* @author Doug Lea * @author Doug Lea
* @author Martin Buchholz * @author Martin Buchholz
@ -162,11 +162,10 @@ public class AtomicDoubleArray implements java.io.Serializable {
* if the current value is <a href="#bitEquals">bitwise equal</a> * if the current value is <a href="#bitEquals">bitwise equal</a>
* to the expected value. * to the expected value.
* *
* <p>May <a * <p><a
* href="http://download.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html#Spurious"> * href="http://download.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html#Spurious">
* fail spuriously</a> * May fail spuriously and does not provide ordering guarantees</a>,
* and does not provide ordering guarantees, so is only rarely an * so is only rarely an appropriate alternative to {@code compareAndSet}.
* appropriate alternative to {@code compareAndSet}.
* *
* @param i the index * @param i the index
* @param expect the expected value * @param expect the expected value

View File

@ -1,9 +0,0 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
// Built on 2013-02-21
package jsr166e;

View File

@ -26,5 +26,5 @@
* are those that directly implement this algorithmic design pattern. * are those that directly implement this algorithmic design pattern.
*/ */
// Built on 2013-02-21 // Built on 2013-07-10
package jsr166y; package jsr166y;