diff --git a/pom.xml b/pom.xml index 97a6b8d08..4ca08f313 100644 --- a/pom.xml +++ b/pom.xml @@ -415,7 +415,14 @@ 4.7 test - + + + org.easymock + easymock + 2.5.2 + test + + UTF-8 diff --git a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java new file mode 100644 index 000000000..21f23846b --- /dev/null +++ b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + *

+ * A specialized semaphore implementation that provides a number of + * permits in a given time frame. + *

+ *

+ * This class is similar to the {@code java.util.concurrent.Semaphore} class + * provided by the JDK in that it manages a configurable number of permits. + * Using the {@link #acquire()} method a permit can be requested by a thread. + * However, there is an additional timing dimension: there is no {@code + * release()} method for freeing a permit, but all permits are automatically + * released at the end of a configurable time frame. If a thread calls + * {@link #acquire()} and the available permits are already exhausted for this + * time frame, the thread is blocked. When the time frame ends all permits + * requested so far are restored, and blocking threads are waked up again, so + * that they can try to acquire a new permit. This basically means that in the + * specified time frame only the given number of operations is possible. + *

+ *

+ * A use case for this class is to artificially limit the load produced by a + * process. As an example consider an application that issues database queries + * on a production system in a background process to gather statistical + * information. This background processing should not produce so much database + * load that the functionality and the performance of the production system are + * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that + * only a given number of database queries are issued per second. + *

+ *

+ * A thread class for performing database queries could look as follows: + * + *

+ * public class StatisticsThread extends Thread {
+ *     // The semaphore for limiting database load.
+ *     private final TimedSemaphore semaphore;
+ *     // Create an instance and set the semaphore
+ *     public StatisticsThread(TimedSemaphore timedSemaphore) {
+ *         semaphore = timedSemaphore;
+ *     }
+ *     // Gather statistics
+ *     public void run() {
+ *         try {
+ *             while(true) {
+ *                 semaphore.acquire();   // limit database load
+ *                 performQuery();        // issue a query
+ *             }
+ *         } catch(InterruptedException) {
+ *             // fall through
+ *         }
+ *     }
+ *     ...
+ * }
+ * 
+ * + * The following code fragment shows how a {@code TimedSemaphore} is created + * that allows only 10 operations per second and passed to the statistics + * thread: + * + *
+ * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
+ * StatisticsThread thread = new StatisticsThread(sem);
+ * thread.start();
+ * 
+ * + *

+ *

+ * When creating an instance the time period for the semaphore must be + * specified. {@code TimedSemaphore} uses an executor service with a + * corresponding period to monitor this interval. The {@code + * ScheduledExecutorService} to be used for this purpose can be provided at + * construction time. Alternatively the class creates an internal executor + * service. + *

+ *

+ * Client code that uses {@code TimedSemaphore} has to call the + * {@link #acquire()} method in aach processing step. {@code TimedSemaphore} + * keeps track of the number of invocations of the {@link #acquire()} method and + * blocks the calling thread if the counter exceeds the limit specified. When + * the timer signals the end of the time period the counter is reset and all + * waiting threads are released. Then another cycle can start. + *

+ *

+ * It is possible to modify the limit at any time using the + * {@link #setLimit(int)} method. This is useful if the load produced by an + * operation has to be adapted dynamically. In the example scenario with the + * thread collecting statistics it may make sense to specify a low limit during + * day time while allowing a higher load in the night time. Reducing the limit + * takes effect immediately by blocking incoming callers. If the limit is + * increased, waiting threads are not released immediately, but wake up when the + * timer runs out. Then, in the next period more processing steps can be + * performed without blocking. By setting the limit to 0 the semaphore can be + * switched off: in this mode the {@link #acquire()} method never blocks, but + * lets all callers pass directly. + *

+ *

+ * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()} + * method should be called. This causes the periodic task that monitors the time + * interval to be canceled. If the {@code ScheduledExecutorService} has been + * created by the semaphore at construction time, it is also shut down. + * resources. After that {@link #acquire()} must not be called any more. + *

+ * + * @version $Id:$ + */ +public class TimedSemaphore { + /** + * Constant for a value representing no limit. If the limit is set to a + * value less or equal this constant, the {@code TimedSemaphore} will be + * effectively switched off. + */ + public static final int NO_LIMIT = 0; + + /** Constant for the thread pool size for the executor. */ + private static final int THREAD_POOL_SIZE = 1; + + /** The executor service for managing the timer thread. */ + private final ScheduledExecutorService executorService; + + /** Stores the period for this timed semaphore. */ + private final long period; + + /** The time unit for the period. */ + private final TimeUnit unit; + + /** A flag whether the executor service was created by this object. */ + private final boolean ownExecutor; + + /** A future object representing the timer task. */ + private ScheduledFuture task; + + /** Stores the total number of invocations of the acquire() method. */ + private long totalAcquireCount; + + /** + * The counter for the periods. This counter is increased every time a + * period ends. + */ + private long periodCount; + + /** The limit. */ + private int limit; + + /** The current counter. */ + private int acquireCount; + + /** The number of invocations of acquire() in the last period. */ + private int lastCallsPerPeriod; + + /** A flag whether shutdown() was called. */ + private boolean shutdown; + + /** + * Creates a new instance of {@link TimedSemaphore} and initializes it with + * the given time period and the limit. + * + * @param timePeriod the time period + * @param timeUnit the unit for the period + * @param limit the limit for the semaphore + * @throws IllegalArgumentException if the period is less or equals 0 + */ + public TimedSemaphore(long timePeriod, TimeUnit timeUnit, int limit) { + this(null, timePeriod, timeUnit, limit); + } + + /** + * Creates a new instance of {@link TimedSemaphore} and initializes it with + * an executor service, the given time period, and the limit. The executor + * service will be used for creating a periodic task for monitoring the time + * period. It can be null, then a default service will be created. + * + * @param service the executor service + * @param timePeriod the time period + * @param timeUnit the unit for the period + * @param limit the limit for the semaphore + * @throws IllegalArgumentException if the period is less or equals 0 + */ + public TimedSemaphore(ScheduledExecutorService service, long timePeriod, + TimeUnit timeUnit, int limit) { + if (timePeriod <= 0) { + throw new IllegalArgumentException("Time period must be greater 0!"); + } + + period = timePeriod; + unit = timeUnit; + + if (service != null) { + executorService = service; + ownExecutor = false; + } else { + ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor( + THREAD_POOL_SIZE); + s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + executorService = s; + ownExecutor = true; + } + + setLimit(limit); + } + + /** + * Returns the limit enforced by this semaphore. The limit determines how + * many invocations of {@link #acquire()} are allowed within the monitored + * period. + * + * @return the limit + */ + public final synchronized int getLimit() { + return limit; + } + + /** + * Sets the limit. This is the number of times the {@link #acquire()} method + * can be called within the time period specified. If this limit is reached, + * further invocations of {@link #acquire()} will block. Setting the limit + * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, + * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in + * the time period. + * + * @param limit the limit + */ + public final synchronized void setLimit(int limit) { + this.limit = limit; + } + + /** + * Initializes a shutdown. After that the object cannot be used any more. + * This method can be invoked an arbitrary number of times. All invocations + * after the first one do not have any effect. + */ + public synchronized void shutdown() { + if (!shutdown) { + + if (ownExecutor) { + // if the executor was created by this instance, it has + // to be shutdown + getExecutorService().shutdownNow(); + } + if (task != null) { + task.cancel(false); + } + + shutdown = true; + } + } + + /** + * Tests whether the {@link #shutdown()} method has been called on this + * object. If this method returns true, this instance cannot be used + * any longer. + * + * @return a flag whether a shutdown has been performed + */ + public synchronized boolean isShutdown() { + return shutdown; + } + + /** + * Tries to acquire a permit from this semaphore. This method will block if + * the limit for the current period has already been reached. If + * {@link #shutdown()} has already been invoked, calling this method will + * cause an exception. The very first call of this method starts the timer + * task which monitors the time period set for this {@code TimedSemaphore}. + * From now on the semaphore is active. + * + * @throws InterruptedException if the thread gets interrupted + * @throws IllegalStateException if this semaphore is already shut down + */ + public synchronized void acquire() throws InterruptedException { + if (isShutdown()) { + throw new IllegalStateException("TimedSemaphore is shut down!"); + } + + if (task == null) { + task = startTimer(); + } + + boolean canPass = false; + do { + canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit(); + if (!canPass) { + wait(); + } else { + acquireCount++; + } + } while (!canPass); + } + + /** + * Returns the number of (successful) acquire invocations during the last + * period. This is the number of times the {@link #acquire()} method was + * called without blocking. This can be useful for testing or debugging + * purposes or to determine a meaningful threshold value. If a limit is set, + * the value returned by this method won't be greater than this limit. + * + * @return the number of non-blocking invocations of the {@link #acquire()} + * method + */ + public synchronized int getLastAcquiresPerPeriod() { + return lastCallsPerPeriod; + } + + /** + * Returns the number of invocations of the {@link #acquire()} method for + * the current period. This may be useful for testing or debugging purposes. + * + * @return the current number of {@link #acquire()} invocations + */ + public synchronized int getAcquireCount() { + return acquireCount; + } + + /** + * Returns the number of calls to the {@link #acquire()} method that can + * still be performed in the current period without blocking. This method + * can give an indication whether it is safe to call the {@link #acquire()} + * method without risking to be suspended. However, there is no guarantee + * that a subsequent call to {@link #acquire()} actually is not-blocking + * because in the mean time other threads may have invoked the semaphore. + * + * @return the current number of available {@link #acquire()} calls in the + * current period + */ + public synchronized int getAvailablePermits() { + return getLimit() - getAcquireCount(); + } + + /** + * Returns the average number of successful (i.e. non-blocking) + * {@link #acquire()} invocations for the entire life-time of this {@code + * TimedSemaphore}. This method can be used for instance for statistical + * calculations. + * + * @return the average number of {@link #acquire()} invocations per time + * unit + */ + public synchronized double getAverageCallsPerPeriod() { + return (periodCount == 0) ? 0 : (double) totalAcquireCount + / (double) periodCount; + } + + /** + * Returns the time period. This is the time monitored by this semaphore. + * Only a given number of invocations of the {@link #acquire()} method is + * possible in this period. + * + * @return the time period + */ + public long getPeriod() { + return period; + } + + /** + * Returns the time unit. This is the unit used by {@link #getPeriod()}. + * + * @return the time unit + */ + public TimeUnit getUnit() { + return unit; + } + + /** + * Returns the executor service used by this instance. + * + * @return the executor service + */ + protected ScheduledExecutorService getExecutorService() { + return executorService; + } + + /** + * Starts the timer. This method is called when {@link #acquire()} is called + * for the first time. It schedules a task to be executed at fixed rate to + * monitor the time period specified. + * + * @return a future object representing the task scheduled + */ + protected ScheduledFuture startTimer() { + return getExecutorService().scheduleAtFixedRate(new Runnable() { + public void run() { + endOfPeriod(); + } + }, getPeriod(), getPeriod(), getUnit()); + } + + /** + * The current time period is finished. This method is called by the timer + * used internally to monitor the time period. It resets the counter and + * releases the threads waiting for this barrier. + */ + synchronized void endOfPeriod() { + lastCallsPerPeriod = acquireCount; + totalAcquireCount += acquireCount; + periodCount++; + acquireCount = 0; + notifyAll(); + } +} diff --git a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java new file mode 100644 index 000000000..b8ba9579d --- /dev/null +++ b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.concurrent.TimedSemaphore; +import org.easymock.EasyMock; +import org.junit.Test; + +/** + * Test class for TimedSemaphore. + * + * @version $Id$ + */ +public class TimedSemaphoreTest { + /** Constant for the time period. */ + private static final long PERIOD = 500; + + /** Constant for the time unit. */ + private static final TimeUnit UNIT = TimeUnit.MILLISECONDS; + + /** Constant for the default limit. */ + private static final int LIMIT = 10; + + /** + * Tests creating a new instance. + */ + @Test + public void testInit() { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + EasyMock.replay(service); + TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, + LIMIT); + EasyMock.verify(service); + assertEquals("Wrong service", service, semaphore.getExecutorService()); + assertEquals("Wrong period", PERIOD, semaphore.getPeriod()); + assertEquals("Wrong unit", UNIT, semaphore.getUnit()); + assertEquals("Statistic available", 0, semaphore + .getLastAcquiresPerPeriod()); + assertEquals("Average available", 0.0, semaphore + .getAverageCallsPerPeriod(), .05); + assertFalse("Already shutdown", semaphore.isShutdown()); + assertEquals("Wrong limit", LIMIT, semaphore.getLimit()); + } + + /** + * Tries to create an instance with a negative period. This should cause an + * exception. + */ + @Test(expected = IllegalArgumentException.class) + public void testInitInvalidPeriod() { + new TimedSemaphore(0L, UNIT, LIMIT); + } + + /** + * Tests whether a default executor service is created if no service is + * provided. + */ + @Test + public void testInitDefaultService() { + TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); + ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore + .getExecutorService(); + assertFalse("Wrong periodic task policy", exec + .getContinueExistingPeriodicTasksAfterShutdownPolicy()); + assertFalse("Wrong delayed task policy", exec + .getExecuteExistingDelayedTasksAfterShutdownPolicy()); + assertFalse("Already shutdown", exec.isShutdown()); + semaphore.shutdown(); + } + + /** + * Tests starting the timer. + */ + @Test + public void testStartTimer() throws InterruptedException { + TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD, + UNIT, LIMIT); + ScheduledFuture future = semaphore.startTimer(); + assertNotNull("No future returned", future); + Thread.sleep(PERIOD); + final int trials = 10; + int count = 0; + do { + Thread.sleep(PERIOD); + if (count++ > trials) { + fail("endOfPeriod() not called!"); + } + } while (semaphore.getPeriodEnds() <= 0); + semaphore.shutdown(); + } + + /** + * Tests the shutdown() method if the executor belongs to the semaphore. In + * this case it has to be shut down. + */ + @Test + public void testShutdownOwnExecutor() { + TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); + semaphore.shutdown(); + assertTrue("Not shutdown", semaphore.isShutdown()); + assertTrue("Executor not shutdown", semaphore.getExecutorService() + .isShutdown()); + } + + /** + * Tests the shutdown() method for a shared executor service before a task + * was started. This should do pretty much nothing. + */ + @Test + public void testShutdownSharedExecutorNoTask() { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + EasyMock.replay(service); + TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, + LIMIT); + semaphore.shutdown(); + assertTrue("Not shutdown", semaphore.isShutdown()); + EasyMock.verify(service); + } + + /** + * Prepares an executor service mock to expect the start of the timer. + * + * @param service the mock + * @param future the future + */ + private void prepareStartTimer(ScheduledExecutorService service, + ScheduledFuture future) { + service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock + .eq(PERIOD), EasyMock.eq(PERIOD), EasyMock.eq(UNIT)); + EasyMock.expectLastCall().andReturn(future); + } + + /** + * Tests the shutdown() method for a shared executor after the task was + * started. In this case the task must be canceled. + */ + @Test + public void testShutdownSharedExecutorTask() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.expect(future.cancel(false)).andReturn(true); + EasyMock.replay(service, future); + TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, + PERIOD, UNIT, LIMIT); + semaphore.acquire(); + semaphore.shutdown(); + assertTrue("Not shutdown", semaphore.isShutdown()); + EasyMock.verify(service, future); + } + + /** + * Tests multiple invocations of the shutdown() method. + */ + @Test + public void testShutdownMultipleTimes() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.expect(future.cancel(false)).andReturn(true); + EasyMock.replay(service, future); + TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, + PERIOD, UNIT, LIMIT); + semaphore.acquire(); + for (int i = 0; i < 10; i++) { + semaphore.shutdown(); + } + EasyMock.verify(service, future); + } + + /** + * Tests the acquire() method if a limit is set. + */ + @Test + public void testAcquireLimit() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + final int count = 10; + CountDownLatch latch = new CountDownLatch(count - 1); + TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 1); + SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, + count - 1); + semaphore.setLimit(count - 1); + + // start a thread that calls the semaphore count times + t.start(); + latch.await(); + // now the semaphore's limit should be reached and the thread blocked + assertEquals("Wrong semaphore count", count - 1, semaphore + .getAcquireCount()); + + // this wakes up the thread, it should call the semaphore once more + semaphore.endOfPeriod(); + t.join(); + assertEquals("Wrong semaphore count (2)", 1, semaphore + .getAcquireCount()); + assertEquals("Wrong acquire() count", count - 1, semaphore + .getLastAcquiresPerPeriod()); + EasyMock.verify(service, future); + } + + /** + * Tests the acquire() method if more threads are involved than the limit. + * This method starts a number of threads that all invoke the semaphore. The + * semaphore's limit is set to 1, so in each period only a single thread can + * acquire the semaphore. + */ + @Test + public void testAcquireMultipleThreads() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, + PERIOD, UNIT, 1); + semaphore.latch = new CountDownLatch(1); + final int count = 10; + SemaphoreThread[] threads = new SemaphoreThread[count]; + for (int i = 0; i < count; i++) { + threads[i] = new SemaphoreThread(semaphore, null, 1, 0); + threads[i].start(); + } + for (int i = 0; i < count; i++) { + semaphore.latch.await(); + assertEquals("Wrong count", 1, semaphore.getAcquireCount()); + semaphore.latch = new CountDownLatch(1); + semaphore.endOfPeriod(); + assertEquals("Wrong acquire count", 1, semaphore + .getLastAcquiresPerPeriod()); + } + for (int i = 0; i < count; i++) { + threads[i].join(); + } + EasyMock.verify(service, future); + } + + /** + * Tests the acquire() method if no limit is set. A test thread is started + * that calls the semaphore a large number of times. Even if the semaphore's + * period does not end, the thread should never block. + */ + @Test + public void testAcquireNoLimit() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service, + PERIOD, UNIT, TimedSemaphore.NO_LIMIT); + final int count = 1000; + CountDownLatch latch = new CountDownLatch(count); + SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count); + t.start(); + latch.await(); + EasyMock.verify(service, future); + } + + /** + * Tries to call acquire() after shutdown(). This should cause an exception. + */ + @Test(expected = IllegalStateException.class) + public void testPassAfterShutdown() throws InterruptedException { + TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT); + semaphore.shutdown(); + semaphore.acquire(); + } + + /** + * Tests a bigger number of invocations that span multiple periods. The + * period is set to a very short time. A background thread calls the + * semaphore a large number of times. While it runs at last one end of a + * period should be reached. + */ + @Test + public void testAcquireMultiplePeriods() throws InterruptedException { + final int count = 1000; + TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl( + PERIOD / 10, TimeUnit.MILLISECONDS, 1); + semaphore.setLimit(count / 4); + CountDownLatch latch = new CountDownLatch(count); + SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count); + t.start(); + latch.await(); + semaphore.shutdown(); + assertTrue("End of period not reached", semaphore.getPeriodEnds() > 0); + } + + /** + * Tests the methods for statistics. + */ + @Test + public void testGetAverageCallsPerPeriod() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, + LIMIT); + semaphore.acquire(); + semaphore.endOfPeriod(); + assertEquals("Wrong average (1)", 1.0, semaphore + .getAverageCallsPerPeriod(), .005); + semaphore.acquire(); + semaphore.acquire(); + semaphore.endOfPeriod(); + assertEquals("Wrong average (2)", 1.5, semaphore + .getAverageCallsPerPeriod(), .005); + EasyMock.verify(service, future); + } + + /** + * Tests whether the available non-blocking calls can be queried. + */ + @Test + public void testGetAvailablePermits() throws InterruptedException { + ScheduledExecutorService service = EasyMock + .createMock(ScheduledExecutorService.class); + ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, + LIMIT); + for (int i = 0; i < LIMIT; i++) { + assertEquals("Wrong available count at " + i, LIMIT - i, semaphore + .getAvailablePermits()); + semaphore.acquire(); + } + semaphore.endOfPeriod(); + assertEquals("Wrong available count in new period", LIMIT, semaphore + .getAvailablePermits()); + EasyMock.verify(service, future); + } + + /** + * A specialized implementation of {@code TimedSemaphore} that is easier to + * test. + */ + private static class TimedSemaphoreTestImpl extends TimedSemaphore { + /** A mock scheduled future. */ + ScheduledFuture schedFuture; + + /** A latch for synchronizing with the main thread. */ + volatile CountDownLatch latch; + + /** Counter for the endOfPeriod() invocations. */ + private int periodEnds; + + public TimedSemaphoreTestImpl(long timePeriod, TimeUnit timeUnit, + int limit) { + super(timePeriod, timeUnit, limit); + } + + public TimedSemaphoreTestImpl(ScheduledExecutorService service, + long timePeriod, TimeUnit timeUnit, int limit) { + super(service, timePeriod, timeUnit, limit); + } + + /** + * Returns the number of invocations of the endOfPeriod() method. + * + * @return the endOfPeriod() invocations + */ + public int getPeriodEnds() { + synchronized (this) { + return periodEnds; + } + } + + /** + * Invokes the latch if one is set. + */ + @Override + public void acquire() throws InterruptedException { + super.acquire(); + if (latch != null) { + latch.countDown(); + } + } + + /** + * Counts the number of invocations. + */ + @Override + protected void endOfPeriod() { + super.endOfPeriod(); + synchronized (this) { + periodEnds++; + } + } + + /** + * Either returns the mock future or calls the super method. + */ + @Override + protected ScheduledFuture startTimer() { + return (schedFuture != null) ? schedFuture : super.startTimer(); + } + } + + /** + * A test thread class that will be used by tests for triggering the + * semaphore. The thread calls the semaphore a configurable number of times. + * When this is done, it can notify the main thread. + */ + private static class SemaphoreThread extends Thread { + /** The semaphore. */ + private final TimedSemaphore semaphore; + + /** A latch for communication with the main thread. */ + private final CountDownLatch latch; + + /** The number of acquire() calls. */ + private final int count; + + /** The number of invocations of the latch. */ + private final int latchCount; + + public SemaphoreThread(TimedSemaphore b, CountDownLatch l, int c, int lc) { + semaphore = b; + latch = l; + count = c; + latchCount = lc; + } + + /** + * Calls acquire() on the semaphore for the specified number of times. + * Optionally the latch will also be triggered to synchronize with the + * main test thread. + */ + @Override + public void run() { + try { + for (int i = 0; i < count; i++) { + semaphore.acquire(); + + if (i < latchCount) { + latch.countDown(); + } + } + } catch (InterruptedException iex) { + Thread.currentThread().interrupt(); + } + } + } +}