diff --git a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java index 9e47b160f..a3517bdf4 100644 --- a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java +++ b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java @@ -16,13 +16,13 @@ */ package org.apache.commons.lang3.concurrent; +import org.apache.commons.lang3.Validate; + import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.Validate; - /** *

* A specialized semaphore implementation that provides a number of @@ -99,13 +99,22 @@ import org.apache.commons.lang3.Validate; *

*

* Client code that uses {@code TimedSemaphore} has to call the - * {@link #acquire()} method in aach processing step. {@code TimedSemaphore} + * {@link #acquire()} method in each processing step. {@code TimedSemaphore} * keeps track of the number of invocations of the {@link #acquire()} method and * blocks the calling thread if the counter exceeds the limit specified. When * the timer signals the end of the time period the counter is reset and all * waiting threads are released. Then another cycle can start. *

*

+ * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This + * method checks whether the semaphore is under the specified limit and + * increases the internal counter if this is the case. The return value is then + * true, and the calling thread can continue with its action. + * If the semaphore is already at its limit, {@code tryAcquire()} immediately + * returns false without blocking; the calling thread must + * then abort its action. This usage scenario prevents blocking of threads. + *

+ *

* It is possible to modify the limit at any time using the * {@link #setLimit(int)} method. This is useful if the load produced by an * operation has to be adapted dynamically. In the example scenario with the @@ -280,7 +289,7 @@ public class TimedSemaphore { } /** - * Tries to acquire a permit from this semaphore. This method will block if + * Acquires a permit from this semaphore. This method will block if * the limit for the current period has already been reached. If * {@link #shutdown()} has already been invoked, calling this method will * cause an exception. The very first call of this method starts the timer @@ -291,25 +300,33 @@ public class TimedSemaphore { * @throws IllegalStateException if this semaphore is already shut down */ public synchronized void acquire() throws InterruptedException { - if (isShutdown()) { - throw new IllegalStateException("TimedSemaphore is shut down!"); - } + prepareAcquire(); - if (task == null) { - task = startTimer(); - } - - boolean canPass = false; + boolean canPass; do { - canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit(); + canPass = acquirePermit(); if (!canPass) { wait(); - } else { - acquireCount++; } } while (!canPass); } + /** + * Tries to acquire a permit from this semaphore. If the limit of this semaphore has + * not yet been reached, a permit is acquired, and this method returns + * true. Otherwise, this method returns immediately with the result + * false. + * + * @return true if a permit could be acquired; false + * otherwise + * @throws IllegalStateException if this semaphore is already shut down + * @since 3.5 + */ + public synchronized boolean tryAcquire() { + prepareAcquire(); + return acquirePermit(); + } + /** * Returns the number of (successful) acquire invocations during the last * period. This is the number of times the {@link #acquire()} method was @@ -420,4 +437,34 @@ public class TimedSemaphore { acquireCount = 0; notifyAll(); } + + /** + * Prepares an acquire operation. Checks for the current state and starts the internal + * timer if necessary. This method must be called with the lock of this object held. + */ + private void prepareAcquire() { + if (isShutdown()) { + throw new IllegalStateException("TimedSemaphore is shut down!"); + } + + if (task == null) { + task = startTimer(); + } + } + + /** + * Internal helper method for acquiring a permit. This method checks whether currently + * a permit can be acquired and - if so - increases the internal counter. The return + * value indicates whether a permit could be acquired. This method must be called with + * the lock of this object held. + * + * @return a flag whether a permit could be acquired + */ + private boolean acquirePermit() { + if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) { + acquireCount++; + return true; + } + return false; + } } diff --git a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java index 067043deb..351102154 100644 --- a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java +++ b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java @@ -384,6 +384,42 @@ public class TimedSemaphoreTest { EasyMock.verify(service, future); } + /** + * Tests the tryAcquire() method. It is checked whether the semaphore can be acquired + * by a bunch of threads the expected number of times and not more. + */ + @Test + public void testTryAcquire() throws InterruptedException { + final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, TimeUnit.SECONDS, + LIMIT); + TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT]; + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new TryAcquireThread(semaphore, latch); + threads[i].start(); + } + + latch.countDown(); + int permits = 0; + for (TryAcquireThread t : threads) { + t.join(); + if (t.acquired) { + permits++; + } + } + assertEquals("Wrong number of permits granted", LIMIT, permits); + } + + /** + * Tries to call tryAcquire() after shutdown(). This should cause an exception. + */ + @Test(expected = IllegalStateException.class) + public void testTryAcquireAfterShutdown() { + final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); + semaphore.shutdown(); + semaphore.tryAcquire(); + } + /** * A specialized implementation of {@code TimedSemaphore} that is easier to * test. @@ -495,4 +531,35 @@ public class TimedSemaphoreTest { } } } + + /** + * A test thread class which invokes {@code tryAcquire()} on the test semaphore and + * records the return value. + */ + private static class TryAcquireThread extends Thread { + /** The semaphore. */ + private final TimedSemaphore semaphore; + + /** A latch for communication with the main thread. */ + private final CountDownLatch latch; + + /** Flag whether a permit could be acquired. */ + private boolean acquired; + + public TryAcquireThread(TimedSemaphore s, CountDownLatch l) { + semaphore = s; + latch = l; + } + + @Override + public void run() { + try { + if (latch.await(10, TimeUnit.SECONDS)) { + acquired = semaphore.tryAcquire(); + } + } catch (InterruptedException iex) { + // ignore + } + } + } }