[LANG-1275] Added a tryAcquire() method to TimedSemaphore.

This method supports non-blocking use cases.
This commit is contained in:
oheger 2016-10-08 16:47:10 +02:00
parent 496506dedd
commit 809e2bed22
2 changed files with 129 additions and 15 deletions

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.commons.lang3.concurrent; package org.apache.commons.lang3.concurrent;
import org.apache.commons.lang3.Validate;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Validate;
/** /**
* <p> * <p>
* A specialized <em>semaphore</em> implementation that provides a number of * A specialized <em>semaphore</em> implementation that provides a number of
@ -99,13 +99,22 @@ import org.apache.commons.lang3.Validate;
* </p> * </p>
* <p> * <p>
* Client code that uses {@code TimedSemaphore} has to call the * Client code that uses {@code TimedSemaphore} has to call the
* {@link #acquire()} method in aach processing step. {@code TimedSemaphore} * {@link #acquire()} method in each processing step. {@code TimedSemaphore}
* keeps track of the number of invocations of the {@link #acquire()} method and * keeps track of the number of invocations of the {@link #acquire()} method and
* blocks the calling thread if the counter exceeds the limit specified. When * blocks the calling thread if the counter exceeds the limit specified. When
* the timer signals the end of the time period the counter is reset and all * the timer signals the end of the time period the counter is reset and all
* waiting threads are released. Then another cycle can start. * waiting threads are released. Then another cycle can start.
* </p> * </p>
* <p> * <p>
* An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
* method checks whether the semaphore is under the specified limit and
* increases the internal counter if this is the case. The return value is then
* <strong>true</strong>, and the calling thread can continue with its action.
* If the semaphore is already at its limit, {@code tryAcquire()} immediately
* returns <strong>false</strong> without blocking; the calling thread must
* then abort its action. This usage scenario prevents blocking of threads.
* </p>
* <p>
* It is possible to modify the limit at any time using the * It is possible to modify the limit at any time using the
* {@link #setLimit(int)} method. This is useful if the load produced by an * {@link #setLimit(int)} method. This is useful if the load produced by an
* operation has to be adapted dynamically. In the example scenario with the * operation has to be adapted dynamically. In the example scenario with the
@ -280,7 +289,7 @@ public class TimedSemaphore {
} }
/** /**
* Tries to acquire a permit from this semaphore. This method will block if * Acquires a permit from this semaphore. This method will block if
* the limit for the current period has already been reached. If * the limit for the current period has already been reached. If
* {@link #shutdown()} has already been invoked, calling this method will * {@link #shutdown()} has already been invoked, calling this method will
* cause an exception. The very first call of this method starts the timer * cause an exception. The very first call of this method starts the timer
@ -291,25 +300,33 @@ public class TimedSemaphore {
* @throws IllegalStateException if this semaphore is already shut down * @throws IllegalStateException if this semaphore is already shut down
*/ */
public synchronized void acquire() throws InterruptedException { public synchronized void acquire() throws InterruptedException {
if (isShutdown()) { prepareAcquire();
throw new IllegalStateException("TimedSemaphore is shut down!");
}
if (task == null) { boolean canPass;
task = startTimer();
}
boolean canPass = false;
do { do {
canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit(); canPass = acquirePermit();
if (!canPass) { if (!canPass) {
wait(); wait();
} else {
acquireCount++;
} }
} while (!canPass); } while (!canPass);
} }
/**
* Tries to acquire a permit from this semaphore. If the limit of this semaphore has
* not yet been reached, a permit is acquired, and this method returns
* <strong>true</strong>. Otherwise, this method returns immediately with the result
* <strong>false</strong>.
*
* @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
* otherwise
* @throws IllegalStateException if this semaphore is already shut down
* @since 3.5
*/
public synchronized boolean tryAcquire() {
prepareAcquire();
return acquirePermit();
}
/** /**
* Returns the number of (successful) acquire invocations during the last * Returns the number of (successful) acquire invocations during the last
* period. This is the number of times the {@link #acquire()} method was * period. This is the number of times the {@link #acquire()} method was
@ -420,4 +437,34 @@ public class TimedSemaphore {
acquireCount = 0; acquireCount = 0;
notifyAll(); notifyAll();
} }
/**
* Prepares an acquire operation. Checks for the current state and starts the internal
* timer if necessary. This method must be called with the lock of this object held.
*/
private void prepareAcquire() {
if (isShutdown()) {
throw new IllegalStateException("TimedSemaphore is shut down!");
}
if (task == null) {
task = startTimer();
}
}
/**
* Internal helper method for acquiring a permit. This method checks whether currently
* a permit can be acquired and - if so - increases the internal counter. The return
* value indicates whether a permit could be acquired. This method must be called with
* the lock of this object held.
*
* @return a flag whether a permit could be acquired
*/
private boolean acquirePermit() {
if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
acquireCount++;
return true;
}
return false;
}
} }

View File

@ -384,6 +384,42 @@ public class TimedSemaphoreTest {
EasyMock.verify(service, future); EasyMock.verify(service, future);
} }
/**
* Tests the tryAcquire() method. It is checked whether the semaphore can be acquired
* by a bunch of threads the expected number of times and not more.
*/
@Test
public void testTryAcquire() throws InterruptedException {
final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, TimeUnit.SECONDS,
LIMIT);
TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new TryAcquireThread(semaphore, latch);
threads[i].start();
}
latch.countDown();
int permits = 0;
for (TryAcquireThread t : threads) {
t.join();
if (t.acquired) {
permits++;
}
}
assertEquals("Wrong number of permits granted", LIMIT, permits);
}
/**
* Tries to call tryAcquire() after shutdown(). This should cause an exception.
*/
@Test(expected = IllegalStateException.class)
public void testTryAcquireAfterShutdown() {
final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
semaphore.shutdown();
semaphore.tryAcquire();
}
/** /**
* A specialized implementation of {@code TimedSemaphore} that is easier to * A specialized implementation of {@code TimedSemaphore} that is easier to
* test. * test.
@ -495,4 +531,35 @@ public class TimedSemaphoreTest {
} }
} }
} }
/**
* A test thread class which invokes {@code tryAcquire()} on the test semaphore and
* records the return value.
*/
private static class TryAcquireThread extends Thread {
/** The semaphore. */
private final TimedSemaphore semaphore;
/** A latch for communication with the main thread. */
private final CountDownLatch latch;
/** Flag whether a permit could be acquired. */
private boolean acquired;
public TryAcquireThread(TimedSemaphore s, CountDownLatch l) {
semaphore = s;
latch = l;
}
@Override
public void run() {
try {
if (latch.await(10, TimeUnit.SECONDS)) {
acquired = semaphore.tryAcquire();
}
} catch (InterruptedException iex) {
// ignore
}
}
}
} }