[LANG-560] Added TimedSemaphore class to concurrent package.
git-svn-id: https://svn.apache.org/repos/asf/commons/proper/lang/trunk@895466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
76e14e483c
commit
ee00c7a6f7
9
pom.xml
9
pom.xml
|
@ -415,7 +415,14 @@
|
|||
<version>4.7</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
<version>2.5.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
|
|
|
@ -0,0 +1,420 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.commons.lang3.concurrent;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* A specialized <em>semaphore</em> implementation that provides a number of
|
||||
* permits in a given time frame.
|
||||
* </p>
|
||||
* <p>
|
||||
* This class is similar to the {@code java.util.concurrent.Semaphore} class
|
||||
* provided by the JDK in that it manages a configurable number of permits.
|
||||
* Using the {@link #acquire()} method a permit can be requested by a thread.
|
||||
* However, there is an additional timing dimension: there is no {@code
|
||||
* release()} method for freeing a permit, but all permits are automatically
|
||||
* released at the end of a configurable time frame. If a thread calls
|
||||
* {@link #acquire()} and the available permits are already exhausted for this
|
||||
* time frame, the thread is blocked. When the time frame ends all permits
|
||||
* requested so far are restored, and blocking threads are waked up again, so
|
||||
* that they can try to acquire a new permit. This basically means that in the
|
||||
* specified time frame only the given number of operations is possible.
|
||||
* </p>
|
||||
* <p>
|
||||
* A use case for this class is to artificially limit the load produced by a
|
||||
* process. As an example consider an application that issues database queries
|
||||
* on a production system in a background process to gather statistical
|
||||
* information. This background processing should not produce so much database
|
||||
* load that the functionality and the performance of the production system are
|
||||
* impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
|
||||
* only a given number of database queries are issued per second.
|
||||
* </p>
|
||||
* <p>
|
||||
* A thread class for performing database queries could look as follows:
|
||||
*
|
||||
* <pre>
|
||||
* public class StatisticsThread extends Thread {
|
||||
* // The semaphore for limiting database load.
|
||||
* private final TimedSemaphore semaphore;
|
||||
* // Create an instance and set the semaphore
|
||||
* public StatisticsThread(TimedSemaphore timedSemaphore) {
|
||||
* semaphore = timedSemaphore;
|
||||
* }
|
||||
* // Gather statistics
|
||||
* public void run() {
|
||||
* try {
|
||||
* while(true) {
|
||||
* semaphore.acquire(); // limit database load
|
||||
* performQuery(); // issue a query
|
||||
* }
|
||||
* } catch(InterruptedException) {
|
||||
* // fall through
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* The following code fragment shows how a {@code TimedSemaphore} is created
|
||||
* that allows only 10 operations per second and passed to the statistics
|
||||
* thread:
|
||||
*
|
||||
* <pre>
|
||||
* TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
|
||||
* StatisticsThread thread = new StatisticsThread(sem);
|
||||
* thread.start();
|
||||
* </pre>
|
||||
*
|
||||
* </p>
|
||||
* <p>
|
||||
* When creating an instance the time period for the semaphore must be
|
||||
* specified. {@code TimedSemaphore} uses an executor service with a
|
||||
* corresponding period to monitor this interval. The {@code
|
||||
* ScheduledExecutorService} to be used for this purpose can be provided at
|
||||
* construction time. Alternatively the class creates an internal executor
|
||||
* service.
|
||||
* </p>
|
||||
* <p>
|
||||
* Client code that uses {@code TimedSemaphore} has to call the
|
||||
* {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
|
||||
* keeps track of the number of invocations of the {@link #acquire()} method and
|
||||
* blocks the calling thread if the counter exceeds the limit specified. When
|
||||
* the timer signals the end of the time period the counter is reset and all
|
||||
* waiting threads are released. Then another cycle can start.
|
||||
* </p>
|
||||
* <p>
|
||||
* It is possible to modify the limit at any time using the
|
||||
* {@link #setLimit(int)} method. This is useful if the load produced by an
|
||||
* operation has to be adapted dynamically. In the example scenario with the
|
||||
* thread collecting statistics it may make sense to specify a low limit during
|
||||
* day time while allowing a higher load in the night time. Reducing the limit
|
||||
* takes effect immediately by blocking incoming callers. If the limit is
|
||||
* increased, waiting threads are not released immediately, but wake up when the
|
||||
* timer runs out. Then, in the next period more processing steps can be
|
||||
* performed without blocking. By setting the limit to 0 the semaphore can be
|
||||
* switched off: in this mode the {@link #acquire()} method never blocks, but
|
||||
* lets all callers pass directly.
|
||||
* </p>
|
||||
* <p>
|
||||
* When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
|
||||
* method should be called. This causes the periodic task that monitors the time
|
||||
* interval to be canceled. If the {@code ScheduledExecutorService} has been
|
||||
* created by the semaphore at construction time, it is also shut down.
|
||||
* resources. After that {@link #acquire()} must not be called any more.
|
||||
* </p>
|
||||
*
|
||||
* @version $Id:$
|
||||
*/
|
||||
public class TimedSemaphore {
|
||||
/**
|
||||
* Constant for a value representing no limit. If the limit is set to a
|
||||
* value less or equal this constant, the {@code TimedSemaphore} will be
|
||||
* effectively switched off.
|
||||
*/
|
||||
public static final int NO_LIMIT = 0;
|
||||
|
||||
/** Constant for the thread pool size for the executor. */
|
||||
private static final int THREAD_POOL_SIZE = 1;
|
||||
|
||||
/** The executor service for managing the timer thread. */
|
||||
private final ScheduledExecutorService executorService;
|
||||
|
||||
/** Stores the period for this timed semaphore. */
|
||||
private final long period;
|
||||
|
||||
/** The time unit for the period. */
|
||||
private final TimeUnit unit;
|
||||
|
||||
/** A flag whether the executor service was created by this object. */
|
||||
private final boolean ownExecutor;
|
||||
|
||||
/** A future object representing the timer task. */
|
||||
private ScheduledFuture<?> task;
|
||||
|
||||
/** Stores the total number of invocations of the acquire() method. */
|
||||
private long totalAcquireCount;
|
||||
|
||||
/**
|
||||
* The counter for the periods. This counter is increased every time a
|
||||
* period ends.
|
||||
*/
|
||||
private long periodCount;
|
||||
|
||||
/** The limit. */
|
||||
private int limit;
|
||||
|
||||
/** The current counter. */
|
||||
private int acquireCount;
|
||||
|
||||
/** The number of invocations of acquire() in the last period. */
|
||||
private int lastCallsPerPeriod;
|
||||
|
||||
/** A flag whether shutdown() was called. */
|
||||
private boolean shutdown;
|
||||
|
||||
/**
|
||||
* Creates a new instance of {@link TimedSemaphore} and initializes it with
|
||||
* the given time period and the limit.
|
||||
*
|
||||
* @param timePeriod the time period
|
||||
* @param timeUnit the unit for the period
|
||||
* @param limit the limit for the semaphore
|
||||
* @throws IllegalArgumentException if the period is less or equals 0
|
||||
*/
|
||||
public TimedSemaphore(long timePeriod, TimeUnit timeUnit, int limit) {
|
||||
this(null, timePeriod, timeUnit, limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of {@link TimedSemaphore} and initializes it with
|
||||
* an executor service, the given time period, and the limit. The executor
|
||||
* service will be used for creating a periodic task for monitoring the time
|
||||
* period. It can be <b>null</b>, then a default service will be created.
|
||||
*
|
||||
* @param service the executor service
|
||||
* @param timePeriod the time period
|
||||
* @param timeUnit the unit for the period
|
||||
* @param limit the limit for the semaphore
|
||||
* @throws IllegalArgumentException if the period is less or equals 0
|
||||
*/
|
||||
public TimedSemaphore(ScheduledExecutorService service, long timePeriod,
|
||||
TimeUnit timeUnit, int limit) {
|
||||
if (timePeriod <= 0) {
|
||||
throw new IllegalArgumentException("Time period must be greater 0!");
|
||||
}
|
||||
|
||||
period = timePeriod;
|
||||
unit = timeUnit;
|
||||
|
||||
if (service != null) {
|
||||
executorService = service;
|
||||
ownExecutor = false;
|
||||
} else {
|
||||
ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
|
||||
THREAD_POOL_SIZE);
|
||||
s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||
s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
executorService = s;
|
||||
ownExecutor = true;
|
||||
}
|
||||
|
||||
setLimit(limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the limit enforced by this semaphore. The limit determines how
|
||||
* many invocations of {@link #acquire()} are allowed within the monitored
|
||||
* period.
|
||||
*
|
||||
* @return the limit
|
||||
*/
|
||||
public final synchronized int getLimit() {
|
||||
return limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the limit. This is the number of times the {@link #acquire()} method
|
||||
* can be called within the time period specified. If this limit is reached,
|
||||
* further invocations of {@link #acquire()} will block. Setting the limit
|
||||
* to a value <= {@link #NO_LIMIT} will cause the limit to be disabled,
|
||||
* i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
|
||||
* the time period.
|
||||
*
|
||||
* @param limit the limit
|
||||
*/
|
||||
public final synchronized void setLimit(int limit) {
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a shutdown. After that the object cannot be used any more.
|
||||
* This method can be invoked an arbitrary number of times. All invocations
|
||||
* after the first one do not have any effect.
|
||||
*/
|
||||
public synchronized void shutdown() {
|
||||
if (!shutdown) {
|
||||
|
||||
if (ownExecutor) {
|
||||
// if the executor was created by this instance, it has
|
||||
// to be shutdown
|
||||
getExecutorService().shutdownNow();
|
||||
}
|
||||
if (task != null) {
|
||||
task.cancel(false);
|
||||
}
|
||||
|
||||
shutdown = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether the {@link #shutdown()} method has been called on this
|
||||
* object. If this method returns <b>true</b>, this instance cannot be used
|
||||
* any longer.
|
||||
*
|
||||
* @return a flag whether a shutdown has been performed
|
||||
*/
|
||||
public synchronized boolean isShutdown() {
|
||||
return shutdown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire a permit from this semaphore. This method will block if
|
||||
* the limit for the current period has already been reached. If
|
||||
* {@link #shutdown()} has already been invoked, calling this method will
|
||||
* cause an exception. The very first call of this method starts the timer
|
||||
* task which monitors the time period set for this {@code TimedSemaphore}.
|
||||
* From now on the semaphore is active.
|
||||
*
|
||||
* @throws InterruptedException if the thread gets interrupted
|
||||
* @throws IllegalStateException if this semaphore is already shut down
|
||||
*/
|
||||
public synchronized void acquire() throws InterruptedException {
|
||||
if (isShutdown()) {
|
||||
throw new IllegalStateException("TimedSemaphore is shut down!");
|
||||
}
|
||||
|
||||
if (task == null) {
|
||||
task = startTimer();
|
||||
}
|
||||
|
||||
boolean canPass = false;
|
||||
do {
|
||||
canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
|
||||
if (!canPass) {
|
||||
wait();
|
||||
} else {
|
||||
acquireCount++;
|
||||
}
|
||||
} while (!canPass);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of (successful) acquire invocations during the last
|
||||
* period. This is the number of times the {@link #acquire()} method was
|
||||
* called without blocking. This can be useful for testing or debugging
|
||||
* purposes or to determine a meaningful threshold value. If a limit is set,
|
||||
* the value returned by this method won't be greater than this limit.
|
||||
*
|
||||
* @return the number of non-blocking invocations of the {@link #acquire()}
|
||||
* method
|
||||
*/
|
||||
public synchronized int getLastAcquiresPerPeriod() {
|
||||
return lastCallsPerPeriod;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of invocations of the {@link #acquire()} method for
|
||||
* the current period. This may be useful for testing or debugging purposes.
|
||||
*
|
||||
* @return the current number of {@link #acquire()} invocations
|
||||
*/
|
||||
public synchronized int getAcquireCount() {
|
||||
return acquireCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of calls to the {@link #acquire()} method that can
|
||||
* still be performed in the current period without blocking. This method
|
||||
* can give an indication whether it is safe to call the {@link #acquire()}
|
||||
* method without risking to be suspended. However, there is no guarantee
|
||||
* that a subsequent call to {@link #acquire()} actually is not-blocking
|
||||
* because in the mean time other threads may have invoked the semaphore.
|
||||
*
|
||||
* @return the current number of available {@link #acquire()} calls in the
|
||||
* current period
|
||||
*/
|
||||
public synchronized int getAvailablePermits() {
|
||||
return getLimit() - getAcquireCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the average number of successful (i.e. non-blocking)
|
||||
* {@link #acquire()} invocations for the entire life-time of this {@code
|
||||
* TimedSemaphore}. This method can be used for instance for statistical
|
||||
* calculations.
|
||||
*
|
||||
* @return the average number of {@link #acquire()} invocations per time
|
||||
* unit
|
||||
*/
|
||||
public synchronized double getAverageCallsPerPeriod() {
|
||||
return (periodCount == 0) ? 0 : (double) totalAcquireCount
|
||||
/ (double) periodCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the time period. This is the time monitored by this semaphore.
|
||||
* Only a given number of invocations of the {@link #acquire()} method is
|
||||
* possible in this period.
|
||||
*
|
||||
* @return the time period
|
||||
*/
|
||||
public long getPeriod() {
|
||||
return period;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the time unit. This is the unit used by {@link #getPeriod()}.
|
||||
*
|
||||
* @return the time unit
|
||||
*/
|
||||
public TimeUnit getUnit() {
|
||||
return unit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the executor service used by this instance.
|
||||
*
|
||||
* @return the executor service
|
||||
*/
|
||||
protected ScheduledExecutorService getExecutorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the timer. This method is called when {@link #acquire()} is called
|
||||
* for the first time. It schedules a task to be executed at fixed rate to
|
||||
* monitor the time period specified.
|
||||
*
|
||||
* @return a future object representing the task scheduled
|
||||
*/
|
||||
protected ScheduledFuture<?> startTimer() {
|
||||
return getExecutorService().scheduleAtFixedRate(new Runnable() {
|
||||
public void run() {
|
||||
endOfPeriod();
|
||||
}
|
||||
}, getPeriod(), getPeriod(), getUnit());
|
||||
}
|
||||
|
||||
/**
|
||||
* The current time period is finished. This method is called by the timer
|
||||
* used internally to monitor the time period. It resets the counter and
|
||||
* releases the threads waiting for this barrier.
|
||||
*/
|
||||
synchronized void endOfPeriod() {
|
||||
lastCallsPerPeriod = acquireCount;
|
||||
totalAcquireCount += acquireCount;
|
||||
periodCount++;
|
||||
acquireCount = 0;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,481 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.commons.lang3.concurrent;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.TimedSemaphore;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test class for TimedSemaphore.
|
||||
*
|
||||
* @version $Id$
|
||||
*/
|
||||
public class TimedSemaphoreTest {
|
||||
/** Constant for the time period. */
|
||||
private static final long PERIOD = 500;
|
||||
|
||||
/** Constant for the time unit. */
|
||||
private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
|
||||
|
||||
/** Constant for the default limit. */
|
||||
private static final int LIMIT = 10;
|
||||
|
||||
/**
|
||||
* Tests creating a new instance.
|
||||
*/
|
||||
@Test
|
||||
public void testInit() {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
EasyMock.replay(service);
|
||||
TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
|
||||
LIMIT);
|
||||
EasyMock.verify(service);
|
||||
assertEquals("Wrong service", service, semaphore.getExecutorService());
|
||||
assertEquals("Wrong period", PERIOD, semaphore.getPeriod());
|
||||
assertEquals("Wrong unit", UNIT, semaphore.getUnit());
|
||||
assertEquals("Statistic available", 0, semaphore
|
||||
.getLastAcquiresPerPeriod());
|
||||
assertEquals("Average available", 0.0, semaphore
|
||||
.getAverageCallsPerPeriod(), .05);
|
||||
assertFalse("Already shutdown", semaphore.isShutdown());
|
||||
assertEquals("Wrong limit", LIMIT, semaphore.getLimit());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to create an instance with a negative period. This should cause an
|
||||
* exception.
|
||||
*/
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testInitInvalidPeriod() {
|
||||
new TimedSemaphore(0L, UNIT, LIMIT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether a default executor service is created if no service is
|
||||
* provided.
|
||||
*/
|
||||
@Test
|
||||
public void testInitDefaultService() {
|
||||
TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
|
||||
ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
|
||||
.getExecutorService();
|
||||
assertFalse("Wrong periodic task policy", exec
|
||||
.getContinueExistingPeriodicTasksAfterShutdownPolicy());
|
||||
assertFalse("Wrong delayed task policy", exec
|
||||
.getExecuteExistingDelayedTasksAfterShutdownPolicy());
|
||||
assertFalse("Already shutdown", exec.isShutdown());
|
||||
semaphore.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests starting the timer.
|
||||
*/
|
||||
@Test
|
||||
public void testStartTimer() throws InterruptedException {
|
||||
TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD,
|
||||
UNIT, LIMIT);
|
||||
ScheduledFuture<?> future = semaphore.startTimer();
|
||||
assertNotNull("No future returned", future);
|
||||
Thread.sleep(PERIOD);
|
||||
final int trials = 10;
|
||||
int count = 0;
|
||||
do {
|
||||
Thread.sleep(PERIOD);
|
||||
if (count++ > trials) {
|
||||
fail("endOfPeriod() not called!");
|
||||
}
|
||||
} while (semaphore.getPeriodEnds() <= 0);
|
||||
semaphore.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the shutdown() method if the executor belongs to the semaphore. In
|
||||
* this case it has to be shut down.
|
||||
*/
|
||||
@Test
|
||||
public void testShutdownOwnExecutor() {
|
||||
TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
|
||||
semaphore.shutdown();
|
||||
assertTrue("Not shutdown", semaphore.isShutdown());
|
||||
assertTrue("Executor not shutdown", semaphore.getExecutorService()
|
||||
.isShutdown());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the shutdown() method for a shared executor service before a task
|
||||
* was started. This should do pretty much nothing.
|
||||
*/
|
||||
@Test
|
||||
public void testShutdownSharedExecutorNoTask() {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
EasyMock.replay(service);
|
||||
TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
|
||||
LIMIT);
|
||||
semaphore.shutdown();
|
||||
assertTrue("Not shutdown", semaphore.isShutdown());
|
||||
EasyMock.verify(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares an executor service mock to expect the start of the timer.
|
||||
*
|
||||
* @param service the mock
|
||||
* @param future the future
|
||||
*/
|
||||
private void prepareStartTimer(ScheduledExecutorService service,
|
||||
ScheduledFuture<?> future) {
|
||||
service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
|
||||
.eq(PERIOD), EasyMock.eq(PERIOD), EasyMock.eq(UNIT));
|
||||
EasyMock.expectLastCall().andReturn(future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the shutdown() method for a shared executor after the task was
|
||||
* started. In this case the task must be canceled.
|
||||
*/
|
||||
@Test
|
||||
public void testShutdownSharedExecutorTask() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.expect(future.cancel(false)).andReturn(true);
|
||||
EasyMock.replay(service, future);
|
||||
TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
|
||||
PERIOD, UNIT, LIMIT);
|
||||
semaphore.acquire();
|
||||
semaphore.shutdown();
|
||||
assertTrue("Not shutdown", semaphore.isShutdown());
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests multiple invocations of the shutdown() method.
|
||||
*/
|
||||
@Test
|
||||
public void testShutdownMultipleTimes() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.expect(future.cancel(false)).andReturn(true);
|
||||
EasyMock.replay(service, future);
|
||||
TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
|
||||
PERIOD, UNIT, LIMIT);
|
||||
semaphore.acquire();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
semaphore.shutdown();
|
||||
}
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the acquire() method if a limit is set.
|
||||
*/
|
||||
@Test
|
||||
public void testAcquireLimit() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.replay(service, future);
|
||||
final int count = 10;
|
||||
CountDownLatch latch = new CountDownLatch(count - 1);
|
||||
TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 1);
|
||||
SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
|
||||
count - 1);
|
||||
semaphore.setLimit(count - 1);
|
||||
|
||||
// start a thread that calls the semaphore count times
|
||||
t.start();
|
||||
latch.await();
|
||||
// now the semaphore's limit should be reached and the thread blocked
|
||||
assertEquals("Wrong semaphore count", count - 1, semaphore
|
||||
.getAcquireCount());
|
||||
|
||||
// this wakes up the thread, it should call the semaphore once more
|
||||
semaphore.endOfPeriod();
|
||||
t.join();
|
||||
assertEquals("Wrong semaphore count (2)", 1, semaphore
|
||||
.getAcquireCount());
|
||||
assertEquals("Wrong acquire() count", count - 1, semaphore
|
||||
.getLastAcquiresPerPeriod());
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the acquire() method if more threads are involved than the limit.
|
||||
* This method starts a number of threads that all invoke the semaphore. The
|
||||
* semaphore's limit is set to 1, so in each period only a single thread can
|
||||
* acquire the semaphore.
|
||||
*/
|
||||
@Test
|
||||
public void testAcquireMultipleThreads() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.replay(service, future);
|
||||
TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
|
||||
PERIOD, UNIT, 1);
|
||||
semaphore.latch = new CountDownLatch(1);
|
||||
final int count = 10;
|
||||
SemaphoreThread[] threads = new SemaphoreThread[count];
|
||||
for (int i = 0; i < count; i++) {
|
||||
threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
|
||||
threads[i].start();
|
||||
}
|
||||
for (int i = 0; i < count; i++) {
|
||||
semaphore.latch.await();
|
||||
assertEquals("Wrong count", 1, semaphore.getAcquireCount());
|
||||
semaphore.latch = new CountDownLatch(1);
|
||||
semaphore.endOfPeriod();
|
||||
assertEquals("Wrong acquire count", 1, semaphore
|
||||
.getLastAcquiresPerPeriod());
|
||||
}
|
||||
for (int i = 0; i < count; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the acquire() method if no limit is set. A test thread is started
|
||||
* that calls the semaphore a large number of times. Even if the semaphore's
|
||||
* period does not end, the thread should never block.
|
||||
*/
|
||||
@Test
|
||||
public void testAcquireNoLimit() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.replay(service, future);
|
||||
TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
|
||||
PERIOD, UNIT, TimedSemaphore.NO_LIMIT);
|
||||
final int count = 1000;
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
|
||||
t.start();
|
||||
latch.await();
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to call acquire() after shutdown(). This should cause an exception.
|
||||
*/
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testPassAfterShutdown() throws InterruptedException {
|
||||
TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
|
||||
semaphore.shutdown();
|
||||
semaphore.acquire();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests a bigger number of invocations that span multiple periods. The
|
||||
* period is set to a very short time. A background thread calls the
|
||||
* semaphore a large number of times. While it runs at last one end of a
|
||||
* period should be reached.
|
||||
*/
|
||||
@Test
|
||||
public void testAcquireMultiplePeriods() throws InterruptedException {
|
||||
final int count = 1000;
|
||||
TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
|
||||
PERIOD / 10, TimeUnit.MILLISECONDS, 1);
|
||||
semaphore.setLimit(count / 4);
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
|
||||
t.start();
|
||||
latch.await();
|
||||
semaphore.shutdown();
|
||||
assertTrue("End of period not reached", semaphore.getPeriodEnds() > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the methods for statistics.
|
||||
*/
|
||||
@Test
|
||||
public void testGetAverageCallsPerPeriod() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.replay(service, future);
|
||||
TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
|
||||
LIMIT);
|
||||
semaphore.acquire();
|
||||
semaphore.endOfPeriod();
|
||||
assertEquals("Wrong average (1)", 1.0, semaphore
|
||||
.getAverageCallsPerPeriod(), .005);
|
||||
semaphore.acquire();
|
||||
semaphore.acquire();
|
||||
semaphore.endOfPeriod();
|
||||
assertEquals("Wrong average (2)", 1.5, semaphore
|
||||
.getAverageCallsPerPeriod(), .005);
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether the available non-blocking calls can be queried.
|
||||
*/
|
||||
@Test
|
||||
public void testGetAvailablePermits() throws InterruptedException {
|
||||
ScheduledExecutorService service = EasyMock
|
||||
.createMock(ScheduledExecutorService.class);
|
||||
ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
|
||||
prepareStartTimer(service, future);
|
||||
EasyMock.replay(service, future);
|
||||
TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
|
||||
LIMIT);
|
||||
for (int i = 0; i < LIMIT; i++) {
|
||||
assertEquals("Wrong available count at " + i, LIMIT - i, semaphore
|
||||
.getAvailablePermits());
|
||||
semaphore.acquire();
|
||||
}
|
||||
semaphore.endOfPeriod();
|
||||
assertEquals("Wrong available count in new period", LIMIT, semaphore
|
||||
.getAvailablePermits());
|
||||
EasyMock.verify(service, future);
|
||||
}
|
||||
|
||||
/**
|
||||
* A specialized implementation of {@code TimedSemaphore} that is easier to
|
||||
* test.
|
||||
*/
|
||||
private static class TimedSemaphoreTestImpl extends TimedSemaphore {
|
||||
/** A mock scheduled future. */
|
||||
ScheduledFuture<?> schedFuture;
|
||||
|
||||
/** A latch for synchronizing with the main thread. */
|
||||
volatile CountDownLatch latch;
|
||||
|
||||
/** Counter for the endOfPeriod() invocations. */
|
||||
private int periodEnds;
|
||||
|
||||
public TimedSemaphoreTestImpl(long timePeriod, TimeUnit timeUnit,
|
||||
int limit) {
|
||||
super(timePeriod, timeUnit, limit);
|
||||
}
|
||||
|
||||
public TimedSemaphoreTestImpl(ScheduledExecutorService service,
|
||||
long timePeriod, TimeUnit timeUnit, int limit) {
|
||||
super(service, timePeriod, timeUnit, limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of invocations of the endOfPeriod() method.
|
||||
*
|
||||
* @return the endOfPeriod() invocations
|
||||
*/
|
||||
public int getPeriodEnds() {
|
||||
synchronized (this) {
|
||||
return periodEnds;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the latch if one is set.
|
||||
*/
|
||||
@Override
|
||||
public void acquire() throws InterruptedException {
|
||||
super.acquire();
|
||||
if (latch != null) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts the number of invocations.
|
||||
*/
|
||||
@Override
|
||||
protected void endOfPeriod() {
|
||||
super.endOfPeriod();
|
||||
synchronized (this) {
|
||||
periodEnds++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Either returns the mock future or calls the super method.
|
||||
*/
|
||||
@Override
|
||||
protected ScheduledFuture<?> startTimer() {
|
||||
return (schedFuture != null) ? schedFuture : super.startTimer();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A test thread class that will be used by tests for triggering the
|
||||
* semaphore. The thread calls the semaphore a configurable number of times.
|
||||
* When this is done, it can notify the main thread.
|
||||
*/
|
||||
private static class SemaphoreThread extends Thread {
|
||||
/** The semaphore. */
|
||||
private final TimedSemaphore semaphore;
|
||||
|
||||
/** A latch for communication with the main thread. */
|
||||
private final CountDownLatch latch;
|
||||
|
||||
/** The number of acquire() calls. */
|
||||
private final int count;
|
||||
|
||||
/** The number of invocations of the latch. */
|
||||
private final int latchCount;
|
||||
|
||||
public SemaphoreThread(TimedSemaphore b, CountDownLatch l, int c, int lc) {
|
||||
semaphore = b;
|
||||
latch = l;
|
||||
count = c;
|
||||
latchCount = lc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls acquire() on the semaphore for the specified number of times.
|
||||
* Optionally the latch will also be triggered to synchronize with the
|
||||
* main test thread.
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for (int i = 0; i < count; i++) {
|
||||
semaphore.acquire();
|
||||
|
||||
if (i < latchCount) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException iex) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue