HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks - add new files
This commit is contained in:
parent
538388c2b5
commit
d3eedb24e1
|
@ -0,0 +1,368 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
|
||||||
|
* periodically while sharing threads. The ChoreService is backed by a
|
||||||
|
* {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
|
||||||
|
* number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
|
||||||
|
* underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
|
||||||
|
* <p>
|
||||||
|
* The ChoreService provides the ability to schedule, cancel, and trigger instances of
|
||||||
|
* {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
|
||||||
|
* scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
|
||||||
|
* load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
|
||||||
|
* there may be a need to increase the number of threads if it is noticed that chores are no longer
|
||||||
|
* meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
|
||||||
|
* made to reduce the number of running threads to see if chores can still meet their start times
|
||||||
|
* with a smaller thread pool.
|
||||||
|
* <p>
|
||||||
|
* When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
|
||||||
|
* Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ChoreService implements ChoreServicer {
|
||||||
|
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
|
||||||
|
*/
|
||||||
|
public final static int MIN_CORE_POOL_SIZE = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This thread pool is used to schedule all of the Chores
|
||||||
|
*/
|
||||||
|
private final ScheduledThreadPoolExecutor scheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps chores to their futures. Futures are used to control a chore's schedule
|
||||||
|
*/
|
||||||
|
private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
|
||||||
|
* core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
|
||||||
|
* increase the core pool size by 1 (otherwise a single long running chore whose execution is
|
||||||
|
* longer than its period would be able to spawn too many threads).
|
||||||
|
*/
|
||||||
|
private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
|
||||||
|
* ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
|
||||||
|
* running on. The prefix is useful because it allows us to monitor how the thread pool of a
|
||||||
|
* particular service changes over time VIA thread dumps.
|
||||||
|
*/
|
||||||
|
private final String coreThreadPoolPrefix;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
|
||||||
|
* spawned by this service
|
||||||
|
*/
|
||||||
|
public ChoreService(final String coreThreadPoolPrefix) {
|
||||||
|
this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
|
||||||
|
* spawned by this service
|
||||||
|
* @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
|
||||||
|
* to during initialization. The default size is 1, but specifying a larger size may be
|
||||||
|
* beneficial if you know that 1 thread will not be enough.
|
||||||
|
*/
|
||||||
|
public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
|
||||||
|
this.coreThreadPoolPrefix = coreThreadPoolPrefix;
|
||||||
|
if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
|
||||||
|
final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
|
||||||
|
scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
|
||||||
|
scheduler.setRemoveOnCancelPolicy(true);
|
||||||
|
scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
|
||||||
|
choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
|
||||||
|
* spawned by this service
|
||||||
|
*/
|
||||||
|
public static ChoreService getInstance(final String coreThreadPoolPrefix) {
|
||||||
|
return new ChoreService(coreThreadPoolPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
|
||||||
|
* instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
|
||||||
|
* with a single ChoreService instance).
|
||||||
|
* @return true when the chore was successfully scheduled. false when the scheduling failed
|
||||||
|
* (typically occurs when a chore is scheduled during shutdown of service)
|
||||||
|
*/
|
||||||
|
public synchronized boolean scheduleChore(ScheduledChore chore) {
|
||||||
|
if (chore == null) return false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
ScheduledFuture<?> future =
|
||||||
|
scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
|
||||||
|
chore.getTimeUnit());
|
||||||
|
chore.setChoreServicer(this);
|
||||||
|
scheduledChores.put(chore, future);
|
||||||
|
return true;
|
||||||
|
} catch (Exception exception) {
|
||||||
|
if (LOG.isInfoEnabled()) {
|
||||||
|
LOG.info("Could not successfully schedule chore: " + chore.getName());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
|
||||||
|
* yet then this call is equivalent to a call to scheduleChore.
|
||||||
|
*/
|
||||||
|
private synchronized void rescheduleChore(ScheduledChore chore) {
|
||||||
|
if (chore == null) return;
|
||||||
|
|
||||||
|
if (scheduledChores.containsKey(chore)) {
|
||||||
|
ScheduledFuture<?> future = scheduledChores.get(chore);
|
||||||
|
future.cancel(false);
|
||||||
|
}
|
||||||
|
scheduleChore(chore);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void cancelChore(ScheduledChore chore) {
|
||||||
|
cancelChore(chore, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
|
||||||
|
if (chore != null && scheduledChores.containsKey(chore)) {
|
||||||
|
ScheduledFuture<?> future = scheduledChores.get(chore);
|
||||||
|
future.cancel(mayInterruptIfRunning);
|
||||||
|
scheduledChores.remove(chore);
|
||||||
|
|
||||||
|
// Removing a chore that was missing its start time means it may be possible
|
||||||
|
// to reduce the number of threads
|
||||||
|
if (choresMissingStartTime.containsKey(chore)) {
|
||||||
|
choresMissingStartTime.remove(chore);
|
||||||
|
requestCorePoolDecrease();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean isChoreScheduled(ScheduledChore chore) {
|
||||||
|
return chore != null && scheduledChores.containsKey(chore)
|
||||||
|
&& !scheduledChores.get(chore).isDone();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean triggerNow(ScheduledChore chore) {
|
||||||
|
if (chore == null) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
rescheduleChore(chore);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return number of chores that this service currently has scheduled
|
||||||
|
*/
|
||||||
|
int getNumberOfScheduledChores() {
|
||||||
|
return scheduledChores.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return number of chores that this service currently has scheduled that are missing their
|
||||||
|
* scheduled start time
|
||||||
|
*/
|
||||||
|
int getNumberOfChoresMissingStartTime() {
|
||||||
|
return choresMissingStartTime.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
|
||||||
|
*/
|
||||||
|
int getCorePoolSize() {
|
||||||
|
return scheduler.getCorePoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
|
||||||
|
* daemon threads, and thus, don't prevent the JVM from shutting down
|
||||||
|
*/
|
||||||
|
static class ChoreServiceThreadFactory implements ThreadFactory {
|
||||||
|
private final String threadPrefix;
|
||||||
|
private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
|
||||||
|
private AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param threadPrefix The prefix given to all threads created by this factory
|
||||||
|
*/
|
||||||
|
public ChoreServiceThreadFactory(final String threadPrefix) {
|
||||||
|
this.threadPrefix = threadPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
Thread thread =
|
||||||
|
new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a request to increase the number of core pool threads. Typically a request
|
||||||
|
* originates from the fact that the current core pool size is not sufficient to service all of
|
||||||
|
* the currently running Chores
|
||||||
|
* @return true when the request to increase the core pool size succeeds
|
||||||
|
*/
|
||||||
|
private synchronized boolean requestCorePoolIncrease() {
|
||||||
|
// There is no point in creating more threads than scheduledChores.size since scheduled runs
|
||||||
|
// of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
|
||||||
|
// amongst occurrences of the same chore).
|
||||||
|
if (scheduler.getCorePoolSize() < scheduledChores.size()) {
|
||||||
|
scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
|
||||||
|
printChoreServiceDetails("requestCorePoolIncrease");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a request to decrease the number of core pool threads. Typically a request
|
||||||
|
* originates from the fact that the current core pool size is more than sufficient to service the
|
||||||
|
* running Chores.
|
||||||
|
*/
|
||||||
|
private synchronized void requestCorePoolDecrease() {
|
||||||
|
if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
|
||||||
|
scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
|
||||||
|
printChoreServiceDetails("requestCorePoolDecrease");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
|
||||||
|
if (chore == null || !scheduledChores.containsKey(chore)) return;
|
||||||
|
|
||||||
|
// If the chore has not caused an increase in the size of the core thread pool then request an
|
||||||
|
// increase. This allows each chore missing its start time to increase the core pool size by
|
||||||
|
// at most 1.
|
||||||
|
if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
|
||||||
|
choresMissingStartTime.put(chore, requestCorePoolIncrease());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
|
||||||
|
// the chore is NOT rescheduled, future executions of this chore will be delayed more and
|
||||||
|
// more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
|
||||||
|
// idle threads to chores based on how delayed they are.
|
||||||
|
rescheduleChore(chore);
|
||||||
|
printChoreDetails("onChoreMissedStartTime", chore);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
|
||||||
|
* in the middle of execution will be interrupted and shutdown. This service will be unusable
|
||||||
|
* after this method has been called (i.e. future scheduling attempts will fail).
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
List<Runnable> ongoing = scheduler.shutdownNow();
|
||||||
|
if (LOG.isInfoEnabled()) {
|
||||||
|
LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + ongoing + " on shutdown");
|
||||||
|
}
|
||||||
|
cancelAllChores(true);
|
||||||
|
scheduledChores.clear();
|
||||||
|
choresMissingStartTime.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true when the service is shutdown and thus cannot be used anymore
|
||||||
|
*/
|
||||||
|
public boolean isShutdown() {
|
||||||
|
return scheduler.isShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true when the service is shutdown and all threads have terminated
|
||||||
|
*/
|
||||||
|
public boolean isTerminated() {
|
||||||
|
return scheduler.isTerminated();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cancelAllChores(final boolean mayInterruptIfRunning) {
|
||||||
|
ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
|
||||||
|
// Build list of chores to cancel so we can iterate through a set that won't change
|
||||||
|
// as chores are cancelled. If we tried to cancel each chore while iterating through
|
||||||
|
// keySet the results would be undefined because the keySet would be changing
|
||||||
|
for (ScheduledChore chore : scheduledChores.keySet()) {
|
||||||
|
choresToCancel.add(chore);
|
||||||
|
}
|
||||||
|
for (ScheduledChore chore : choresToCancel) {
|
||||||
|
chore.cancel(mayInterruptIfRunning);
|
||||||
|
}
|
||||||
|
choresToCancel.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prints a summary of important details about the chore. Used for debugging purposes
|
||||||
|
*/
|
||||||
|
private void printChoreDetails(final String header, ScheduledChore chore) {
|
||||||
|
LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
|
||||||
|
output.put(header, "");
|
||||||
|
output.put("Chore name: ", chore.getName());
|
||||||
|
output.put("Chore period: ", Integer.toString(chore.getPeriod()));
|
||||||
|
output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
|
||||||
|
|
||||||
|
for (Entry<String, String> entry : output.entrySet()) {
|
||||||
|
if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prints a summary of important details about the service. Used for debugging purposes
|
||||||
|
*/
|
||||||
|
private void printChoreServiceDetails(final String header) {
|
||||||
|
LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
|
||||||
|
output.put(header, "");
|
||||||
|
output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
|
||||||
|
output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
|
||||||
|
output.put("ChoreService missingStartTimeCount: ",
|
||||||
|
Integer.toString(getNumberOfChoresMissingStartTime()));
|
||||||
|
|
||||||
|
for (Entry<String, String> entry : output.entrySet()) {
|
||||||
|
if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,330 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
|
||||||
|
* scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
|
||||||
|
* chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
|
||||||
|
* access to the threads in the core thread pool. If an unhandled exception occurs, the chore
|
||||||
|
* cancellation is logged. Implementers should consider whether or not the Chore will be able to
|
||||||
|
* execute within the defined period. It is bad practice to define a ScheduledChore whose execution
|
||||||
|
* time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
|
||||||
|
* thread pool.
|
||||||
|
* <p>
|
||||||
|
* Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
|
||||||
|
* an entry being added to a queue, etc.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class ScheduledChore implements Runnable {
|
||||||
|
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default values for scheduling parameters should they be excluded during construction
|
||||||
|
*/
|
||||||
|
private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
|
||||||
|
private final static long DEFAULT_INITIAL_DELAY = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
|
||||||
|
*/
|
||||||
|
private final int period;
|
||||||
|
private final TimeUnit timeUnit;
|
||||||
|
private final long initialDelay;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
|
||||||
|
* not scheduled.
|
||||||
|
*/
|
||||||
|
private ChoreServicer choreServicer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Variables that encapsulate the meaningful state information
|
||||||
|
*/
|
||||||
|
private long timeOfLastRun = -1;
|
||||||
|
private long timeOfThisRun = -1;
|
||||||
|
private boolean initialChoreComplete = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
|
||||||
|
* stopped, it will cancel itself. This is particularly useful in the case where a single stopper
|
||||||
|
* instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
|
||||||
|
* command can cause many chores to stop together.
|
||||||
|
*/
|
||||||
|
private final Stoppable stopper;
|
||||||
|
|
||||||
|
interface ChoreServicer {
|
||||||
|
/**
|
||||||
|
* Cancel any ongoing schedules that this chore has with the implementer of this interface.
|
||||||
|
*/
|
||||||
|
public void cancelChore(ScheduledChore chore);
|
||||||
|
public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true when the chore is scheduled with the implementer of this interface
|
||||||
|
*/
|
||||||
|
public boolean isChoreScheduled(ScheduledChore chore);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method tries to execute the chore immediately. If the chore is executing at the time of
|
||||||
|
* this call, the chore will begin another execution as soon as the current execution finishes
|
||||||
|
* <p>
|
||||||
|
* If the chore is not scheduled with a ChoreService, this call will fail.
|
||||||
|
* @return false when the chore could not be triggered immediately
|
||||||
|
*/
|
||||||
|
public boolean triggerNow(ScheduledChore chore);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A callback that tells the implementer of this interface that one of the scheduled chores is
|
||||||
|
* missing its start time. The implication of a chore missing its start time is that the
|
||||||
|
* service's current means of scheduling may not be sufficient to handle the number of ongoing
|
||||||
|
* chores (the other explanation is that the chore's execution time is greater than its
|
||||||
|
* scheduled period). The service should try to increase its concurrency when this callback is
|
||||||
|
* received.
|
||||||
|
* @param chore The chore that missed its start time
|
||||||
|
*/
|
||||||
|
public void onChoreMissedStartTime(ScheduledChore chore);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This constructor is for test only. It allows us to create an object and to call chore() on it.
|
||||||
|
*/
|
||||||
|
protected ScheduledChore() {
|
||||||
|
this.name = null;
|
||||||
|
this.stopper = null;
|
||||||
|
this.period = 0;
|
||||||
|
this.initialDelay = DEFAULT_INITIAL_DELAY;
|
||||||
|
this.timeUnit = DEFAULT_TIME_UNIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param name Name assigned to Chore. Useful for identification amongst chores of the same type
|
||||||
|
* @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
|
||||||
|
* @param period Period with which this Chore repeats execution when scheduled.
|
||||||
|
*/
|
||||||
|
public ScheduledChore(final String name, Stoppable stopper, final int period) {
|
||||||
|
this(name, stopper, period, DEFAULT_INITIAL_DELAY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param name Name assigned to Chore. Useful for identification amongst chores of the same type
|
||||||
|
* @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
|
||||||
|
* @param period Period with which this Chore repeats execution when scheduled.
|
||||||
|
* @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
|
||||||
|
* value of 0 means the chore will begin to execute immediately. Negative delays are
|
||||||
|
* invalid and will be corrected to a value of 0.
|
||||||
|
*/
|
||||||
|
public ScheduledChore(final String name, Stoppable stopper, final int period,
|
||||||
|
final long initialDelay) {
|
||||||
|
this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param name Name assigned to Chore. Useful for identification amongst chores of the same type
|
||||||
|
* @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
|
||||||
|
* @param period Period with which this Chore repeats execution when scheduled.
|
||||||
|
* @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
|
||||||
|
* value of 0 means the chore will begin to execute immediately. Negative delays are
|
||||||
|
* invalid and will be corrected to a value of 0.
|
||||||
|
* @param unit The unit that is used to measure period and initialDelay
|
||||||
|
*/
|
||||||
|
public ScheduledChore(final String name, Stoppable stopper, final int period,
|
||||||
|
final long initialDelay, final TimeUnit unit) {
|
||||||
|
this.name = name;
|
||||||
|
this.stopper = stopper;
|
||||||
|
this.period = period;
|
||||||
|
this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
|
||||||
|
this.timeUnit = unit;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void resetState() {
|
||||||
|
timeOfLastRun = -1;
|
||||||
|
timeOfThisRun = -1;
|
||||||
|
initialChoreComplete = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see java.lang.Thread#run()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
timeOfLastRun = timeOfThisRun;
|
||||||
|
timeOfThisRun = System.currentTimeMillis();
|
||||||
|
if (missedStartTime() && choreServicer != null) {
|
||||||
|
choreServicer.onChoreMissedStartTime(this);
|
||||||
|
if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
|
||||||
|
} else if (stopper.isStopped()) {
|
||||||
|
cancel();
|
||||||
|
cleanup();
|
||||||
|
LOG.info("Chore: " + getName() + " was stopped");
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
if (!initialChoreComplete) {
|
||||||
|
initialChoreComplete = initialChore();
|
||||||
|
} else {
|
||||||
|
chore();
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Caught error", t);
|
||||||
|
if (this.stopper.isStopped()) {
|
||||||
|
cancel();
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return How long has it been since this chore last run. Useful for checking if the chore has
|
||||||
|
* missed its scheduled start time by too large of a margin
|
||||||
|
*/
|
||||||
|
synchronized long getTimeBetweenRuns() {
|
||||||
|
return timeOfThisRun - timeOfLastRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true when the time between runs exceeds the acceptable threshold
|
||||||
|
*/
|
||||||
|
private synchronized boolean missedStartTime() {
|
||||||
|
return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
|
||||||
|
&& getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized double getMaximumAllowedTimeBetweenRuns() {
|
||||||
|
// Threshold used to determine if the Chore's current run started too late
|
||||||
|
return 1.5 * period;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized boolean isValidTime(final long time) {
|
||||||
|
return time > 0 && time <= System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return false when the Chore is not currently scheduled with a ChoreService
|
||||||
|
*/
|
||||||
|
public synchronized boolean triggerNow() {
|
||||||
|
if (choreServicer != null) {
|
||||||
|
return choreServicer.triggerNow(this);
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void setChoreServicer(ChoreServicer service) {
|
||||||
|
// Chores should only ever be scheduled with a single ChoreService. If the choreServicer
|
||||||
|
// is changing, cancel any existing schedules of this chore.
|
||||||
|
if (choreServicer != null && choreServicer != service) {
|
||||||
|
choreServicer.cancelChore(this, false);
|
||||||
|
}
|
||||||
|
choreServicer = service;
|
||||||
|
timeOfThisRun = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void cancel() {
|
||||||
|
cancel(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void cancel(boolean mayInterruptIfRunning) {
|
||||||
|
if (choreServicer != null) choreServicer.cancelChore(this, mayInterruptIfRunning);
|
||||||
|
|
||||||
|
choreServicer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized Stoppable getStopper() {
|
||||||
|
return stopper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized int getPeriod() {
|
||||||
|
return period;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized long getInitialDelay() {
|
||||||
|
return initialDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final synchronized TimeUnit getTimeUnit() {
|
||||||
|
return timeUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean isInitialChoreComplete() {
|
||||||
|
return initialChoreComplete;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized ChoreServicer getChoreServicer() {
|
||||||
|
return choreServicer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized long getTimeOfLastRun() {
|
||||||
|
return timeOfLastRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized long getTimeOfThisRun() {
|
||||||
|
return timeOfThisRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true when this Chore is scheduled with a ChoreService
|
||||||
|
*/
|
||||||
|
public synchronized boolean isScheduled() {
|
||||||
|
return choreServicer != null && choreServicer.isChoreScheduled(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized void choreForTesting() {
|
||||||
|
chore();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The task to execute on each scheduled execution of the Chore
|
||||||
|
*/
|
||||||
|
protected abstract void chore();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override to run a task before we start looping.
|
||||||
|
* @return true if initial chore was successful
|
||||||
|
*/
|
||||||
|
protected synchronized boolean initialChore() {
|
||||||
|
// Default does nothing
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override to run cleanup tasks when the Chore encounters an error and must stop running
|
||||||
|
*/
|
||||||
|
protected synchronized void cleanup() {
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,844 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore;
|
||||||
|
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore;
|
||||||
|
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore;
|
||||||
|
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper;
|
||||||
|
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore;
|
||||||
|
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestChoreService {
|
||||||
|
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||||
|
private final String TEST_SERVER_NAME = "testServerName";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A few ScheduledChore samples that are useful for testing with ChoreService
|
||||||
|
*/
|
||||||
|
public static class ScheduledChoreSamples {
|
||||||
|
/**
|
||||||
|
* Straight forward stopper implementation that is used by default when one is not provided
|
||||||
|
*/
|
||||||
|
public static class SampleStopper implements Stoppable {
|
||||||
|
private boolean stopped = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
stopped = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return stopped;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
|
||||||
|
* executions
|
||||||
|
*/
|
||||||
|
public static class SlowChore extends ScheduledChore {
|
||||||
|
public SlowChore(String name, int period) {
|
||||||
|
this(name, new SampleStopper(), period);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SlowChore(String name, Stoppable stopper, int period) {
|
||||||
|
super(name, stopper, period);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean initialChore() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(getPeriod() * 2);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(getPeriod() * 2);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
|
||||||
|
*/
|
||||||
|
public static class DoNothingChore extends ScheduledChore {
|
||||||
|
public DoNothingChore(String name, int period) {
|
||||||
|
super(name, new SampleStopper(), period);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DoNothingChore(String name, Stoppable stopper, int period) {
|
||||||
|
super(name, stopper, period);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class SleepingChore extends ScheduledChore {
|
||||||
|
private int sleepTime;
|
||||||
|
|
||||||
|
public SleepingChore(String name, int chorePeriod, int sleepTime) {
|
||||||
|
this(name, new SampleStopper(), chorePeriod, sleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) {
|
||||||
|
super(name, stopper, period);
|
||||||
|
this.sleepTime = sleepTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean initialChore() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println(e.getStackTrace());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CountingChore extends ScheduledChore {
|
||||||
|
private int countOfChoreCalls;
|
||||||
|
private boolean outputOnTicks = false;
|
||||||
|
|
||||||
|
public CountingChore(String name, int period) {
|
||||||
|
this(name, new SampleStopper(), period);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CountingChore(String name, Stoppable stopper, int period) {
|
||||||
|
this(name, stopper, period, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CountingChore(String name, Stoppable stopper, int period,
|
||||||
|
final boolean outputOnTicks) {
|
||||||
|
super(name, stopper, period);
|
||||||
|
this.countOfChoreCalls = 0;
|
||||||
|
this.outputOnTicks = outputOnTicks;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean initialChore() {
|
||||||
|
countOfChoreCalls++;
|
||||||
|
if (outputOnTicks) outputTickCount();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
countOfChoreCalls++;
|
||||||
|
if (outputOnTicks) outputTickCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void outputTickCount() {
|
||||||
|
System.out.println("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCountOfChoreCalls() {
|
||||||
|
return countOfChoreCalls;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isOutputtingOnTicks() {
|
||||||
|
return outputOnTicks;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOutputOnTicks(boolean o) {
|
||||||
|
outputOnTicks = o;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Chore that will try to execute the initial chore a few times before succeeding. Once the
|
||||||
|
* initial chore is complete the chore cancels itself
|
||||||
|
*/
|
||||||
|
public static class FailInitialChore extends ScheduledChore {
|
||||||
|
private int numberOfFailures;
|
||||||
|
private int failureThreshold;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param failThreshold Number of times the Chore fails when trying to execute initialChore
|
||||||
|
* before succeeding.
|
||||||
|
*/
|
||||||
|
public FailInitialChore(String name, int period, int failThreshold) {
|
||||||
|
this(name, new SampleStopper(), period, failThreshold);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) {
|
||||||
|
super(name, stopper, period);
|
||||||
|
numberOfFailures = 0;
|
||||||
|
failureThreshold = failThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean initialChore() {
|
||||||
|
if (numberOfFailures < failureThreshold) {
|
||||||
|
numberOfFailures++;
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
assertTrue(numberOfFailures == failureThreshold);
|
||||||
|
cancel(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitialChorePrecedence() throws InterruptedException {
|
||||||
|
ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
|
||||||
|
|
||||||
|
final int period = 100;
|
||||||
|
final int failureThreshold = 5;
|
||||||
|
ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
|
||||||
|
int loopCount = 0;
|
||||||
|
boolean brokeOutOfLoop = false;
|
||||||
|
|
||||||
|
while (!chore.isInitialChoreComplete() && chore.isScheduled()) {
|
||||||
|
Thread.sleep(failureThreshold * period);
|
||||||
|
loopCount++;
|
||||||
|
if (loopCount > 3) {
|
||||||
|
brokeOutOfLoop = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(brokeOutOfLoop);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelChore() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
ScheduledChore chore1 = new DoNothingChore("chore1", period);
|
||||||
|
ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
|
||||||
|
|
||||||
|
service.scheduleChore(chore1);
|
||||||
|
assertTrue(chore1.isScheduled());
|
||||||
|
|
||||||
|
chore1.cancel(true);
|
||||||
|
assertFalse(chore1.isScheduled());
|
||||||
|
assertTrue(service.getNumberOfScheduledChores() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduledChoreConstruction() {
|
||||||
|
final String NAME = "chore";
|
||||||
|
final int PERIOD = 100;
|
||||||
|
final long VALID_DELAY = 0;
|
||||||
|
final long INVALID_DELAY = -100;
|
||||||
|
final TimeUnit UNIT = TimeUnit.NANOSECONDS;
|
||||||
|
|
||||||
|
ScheduledChore chore1 =
|
||||||
|
new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) {
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
assertEquals("Name construction failed", chore1.getName(), NAME);
|
||||||
|
assertEquals("Period construction failed", chore1.getPeriod(), PERIOD);
|
||||||
|
assertEquals("Initial Delay construction failed", chore1.getInitialDelay(), VALID_DELAY);
|
||||||
|
assertEquals("TimeUnit construction failed", chore1.getTimeUnit(), UNIT);
|
||||||
|
|
||||||
|
ScheduledChore invalidDelayChore =
|
||||||
|
new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) {
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
assertEquals("Initial Delay should be set to 0 when invalid", 0,
|
||||||
|
invalidDelayChore.getInitialDelay());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChoreServiceConstruction() {
|
||||||
|
final int corePoolSize = 10;
|
||||||
|
final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
|
||||||
|
|
||||||
|
ChoreService customInit = new ChoreService(TEST_SERVER_NAME, corePoolSize);
|
||||||
|
assertEquals(corePoolSize, customInit.getCorePoolSize());
|
||||||
|
|
||||||
|
ChoreService defaultInit = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize());
|
||||||
|
|
||||||
|
ChoreService invalidInit = new ChoreService(TEST_SERVER_NAME, -10);
|
||||||
|
assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFrequencyOfChores() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
// Small delta that acts as time buffer (allowing chores to complete if running slowly)
|
||||||
|
final int delta = 5;
|
||||||
|
ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
|
||||||
|
CountingChore chore = new CountingChore("countingChore", period);
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
|
||||||
|
Thread.sleep(10 * period + delta);
|
||||||
|
assertTrue(chore.getCountOfChoreCalls() == 11);
|
||||||
|
|
||||||
|
Thread.sleep(10 * period);
|
||||||
|
assertTrue(chore.getCountOfChoreCalls() == 21);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testForceTrigger() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
final int delta = 5;
|
||||||
|
ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
|
||||||
|
CountingChore chore = new CountingChore("countingChore", period);
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
Thread.sleep(10 * period + delta);
|
||||||
|
|
||||||
|
assertTrue(chore.getCountOfChoreCalls() == 11);
|
||||||
|
|
||||||
|
// Force five runs of the chore to occur, sleeping between triggers to ensure the
|
||||||
|
// chore has time to run
|
||||||
|
chore.triggerNow();
|
||||||
|
Thread.sleep(delta);
|
||||||
|
chore.triggerNow();
|
||||||
|
Thread.sleep(delta);
|
||||||
|
chore.triggerNow();
|
||||||
|
Thread.sleep(delta);
|
||||||
|
chore.triggerNow();
|
||||||
|
Thread.sleep(delta);
|
||||||
|
chore.triggerNow();
|
||||||
|
Thread.sleep(delta);
|
||||||
|
|
||||||
|
assertTrue(chore.getCountOfChoreCalls() == 16);
|
||||||
|
|
||||||
|
Thread.sleep(10 * period + delta);
|
||||||
|
|
||||||
|
assertTrue(chore.getCountOfChoreCalls() == 26);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorePoolIncrease() throws InterruptedException {
|
||||||
|
final int initialCorePoolSize = 3;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize);
|
||||||
|
assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize,
|
||||||
|
service.getCorePoolSize());
|
||||||
|
|
||||||
|
final int slowChorePeriod = 100;
|
||||||
|
SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod);
|
||||||
|
SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod);
|
||||||
|
SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod);
|
||||||
|
|
||||||
|
service.scheduleChore(slowChore1);
|
||||||
|
service.scheduleChore(slowChore2);
|
||||||
|
service.scheduleChore(slowChore3);
|
||||||
|
|
||||||
|
Thread.sleep(slowChorePeriod * 10);
|
||||||
|
assertEquals("Should not create more pools than scheduled chores", 3,
|
||||||
|
service.getCorePoolSize());
|
||||||
|
|
||||||
|
SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod);
|
||||||
|
service.scheduleChore(slowChore4);
|
||||||
|
|
||||||
|
Thread.sleep(slowChorePeriod * 10);
|
||||||
|
assertEquals("Chores are missing their start time. Should expand core pool size", 4,
|
||||||
|
service.getCorePoolSize());
|
||||||
|
|
||||||
|
SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod);
|
||||||
|
service.scheduleChore(slowChore5);
|
||||||
|
|
||||||
|
Thread.sleep(slowChorePeriod * 10);
|
||||||
|
assertEquals("Chores are missing their start time. Should expand core pool size", 5,
|
||||||
|
service.getCorePoolSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorePoolDecrease() throws InterruptedException {
|
||||||
|
final int initialCorePoolSize = 3;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize);
|
||||||
|
final int chorePeriod = 10;
|
||||||
|
|
||||||
|
// Slow chores always miss their start time and thus the core pool size should be at least as
|
||||||
|
// large as the number of running slow chores
|
||||||
|
SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
|
||||||
|
SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod);
|
||||||
|
SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod);
|
||||||
|
|
||||||
|
service.scheduleChore(slowChore1);
|
||||||
|
service.scheduleChore(slowChore2);
|
||||||
|
service.scheduleChore(slowChore3);
|
||||||
|
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals("Should not create more pools than scheduled chores",
|
||||||
|
service.getNumberOfScheduledChores(), service.getCorePoolSize());
|
||||||
|
|
||||||
|
SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod);
|
||||||
|
service.scheduleChore(slowChore4);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals("Chores are missing their start time. Should expand core pool size",
|
||||||
|
service.getNumberOfScheduledChores(), service.getCorePoolSize());
|
||||||
|
|
||||||
|
SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod);
|
||||||
|
service.scheduleChore(slowChore5);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals("Chores are missing their start time. Should expand core pool size",
|
||||||
|
service.getNumberOfScheduledChores(), service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 5);
|
||||||
|
|
||||||
|
slowChore5.cancel();
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 4);
|
||||||
|
|
||||||
|
slowChore4.cancel();
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 3);
|
||||||
|
|
||||||
|
slowChore3.cancel();
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
|
||||||
|
|
||||||
|
slowChore2.cancel();
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 1);
|
||||||
|
|
||||||
|
slowChore1.cancel();
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 0);
|
||||||
|
|
||||||
|
slowChore1.resetState();
|
||||||
|
service.scheduleChore(slowChore1);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 1);
|
||||||
|
|
||||||
|
slowChore2.resetState();
|
||||||
|
service.scheduleChore(slowChore2);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
|
service.getCorePoolSize());
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
|
||||||
|
|
||||||
|
DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod);
|
||||||
|
service.scheduleChore(fastChore1);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
|
||||||
|
assertEquals("Should increase", 3, service.getCorePoolSize());
|
||||||
|
|
||||||
|
DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod);
|
||||||
|
service.scheduleChore(fastChore2);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
|
||||||
|
assertEquals("Should increase", 3, service.getCorePoolSize());
|
||||||
|
|
||||||
|
DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod);
|
||||||
|
service.scheduleChore(fastChore3);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
|
||||||
|
assertEquals("Should not change", 3, service.getCorePoolSize());
|
||||||
|
|
||||||
|
DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod);
|
||||||
|
service.scheduleChore(fastChore4);
|
||||||
|
Thread.sleep(chorePeriod * 10);
|
||||||
|
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
|
||||||
|
assertEquals("Should not change", 3, service.getCorePoolSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumberOfRunningChores() throws InterruptedException {
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
|
||||||
|
final int period = 100;
|
||||||
|
final int sleepTime = 5;
|
||||||
|
|
||||||
|
DoNothingChore dn1 = new DoNothingChore("dn1", period);
|
||||||
|
DoNothingChore dn2 = new DoNothingChore("dn2", period);
|
||||||
|
DoNothingChore dn3 = new DoNothingChore("dn3", period);
|
||||||
|
DoNothingChore dn4 = new DoNothingChore("dn4", period);
|
||||||
|
DoNothingChore dn5 = new DoNothingChore("dn5", period);
|
||||||
|
|
||||||
|
service.scheduleChore(dn1);
|
||||||
|
service.scheduleChore(dn2);
|
||||||
|
service.scheduleChore(dn3);
|
||||||
|
service.scheduleChore(dn4);
|
||||||
|
service.scheduleChore(dn5);
|
||||||
|
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores());
|
||||||
|
|
||||||
|
dn1.cancel();
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores());
|
||||||
|
|
||||||
|
dn2.cancel();
|
||||||
|
dn3.cancel();
|
||||||
|
dn4.cancel();
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores());
|
||||||
|
|
||||||
|
dn5.cancel();
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumberOfChoresMissingStartTime() throws InterruptedException {
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
|
||||||
|
final int period = 100;
|
||||||
|
final int sleepTime = 5 * period;
|
||||||
|
|
||||||
|
// Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
|
||||||
|
// ALWAYS miss their start time since their execution takes longer than their period
|
||||||
|
SlowChore sc1 = new SlowChore("sc1", period);
|
||||||
|
SlowChore sc2 = new SlowChore("sc2", period);
|
||||||
|
SlowChore sc3 = new SlowChore("sc3", period);
|
||||||
|
SlowChore sc4 = new SlowChore("sc4", period);
|
||||||
|
SlowChore sc5 = new SlowChore("sc5", period);
|
||||||
|
|
||||||
|
service.scheduleChore(sc1);
|
||||||
|
service.scheduleChore(sc2);
|
||||||
|
service.scheduleChore(sc3);
|
||||||
|
service.scheduleChore(sc4);
|
||||||
|
service.scheduleChore(sc5);
|
||||||
|
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals(5, service.getNumberOfChoresMissingStartTime());
|
||||||
|
|
||||||
|
sc1.cancel();
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals(4, service.getNumberOfChoresMissingStartTime());
|
||||||
|
|
||||||
|
sc2.cancel();
|
||||||
|
sc3.cancel();
|
||||||
|
sc4.cancel();
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals(1, service.getNumberOfChoresMissingStartTime());
|
||||||
|
|
||||||
|
sc5.cancel();
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertEquals(0, service.getNumberOfChoresMissingStartTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ChoreServices should never have a core pool size that exceeds the number of chores that have
|
||||||
|
* been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a
|
||||||
|
* ChoreService, the number of threads in the ChoreService's core pool should never exceed 4
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMaximumChoreServiceThreads() throws InterruptedException {
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
|
||||||
|
final int period = 10;
|
||||||
|
final int sleepTime = 5 * period;
|
||||||
|
|
||||||
|
// Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
|
||||||
|
// ALWAYS miss their start time since their execution takes longer than their period.
|
||||||
|
// Chores that miss their start time will trigger the onChoreMissedStartTime callback
|
||||||
|
// in the ChoreService. This callback will try to increase the number of core pool
|
||||||
|
// threads.
|
||||||
|
SlowChore sc1 = new SlowChore("sc1", period);
|
||||||
|
SlowChore sc2 = new SlowChore("sc2", period);
|
||||||
|
SlowChore sc3 = new SlowChore("sc3", period);
|
||||||
|
SlowChore sc4 = new SlowChore("sc4", period);
|
||||||
|
SlowChore sc5 = new SlowChore("sc5", period);
|
||||||
|
|
||||||
|
service.scheduleChore(sc1);
|
||||||
|
service.scheduleChore(sc2);
|
||||||
|
service.scheduleChore(sc3);
|
||||||
|
service.scheduleChore(sc4);
|
||||||
|
service.scheduleChore(sc5);
|
||||||
|
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
|
||||||
|
|
||||||
|
SlowChore sc6 = new SlowChore("sc6", period);
|
||||||
|
SlowChore sc7 = new SlowChore("sc7", period);
|
||||||
|
SlowChore sc8 = new SlowChore("sc8", period);
|
||||||
|
SlowChore sc9 = new SlowChore("sc9", period);
|
||||||
|
SlowChore sc10 = new SlowChore("sc10", period);
|
||||||
|
|
||||||
|
service.scheduleChore(sc6);
|
||||||
|
service.scheduleChore(sc7);
|
||||||
|
service.scheduleChore(sc8);
|
||||||
|
service.scheduleChore(sc9);
|
||||||
|
service.scheduleChore(sc10);
|
||||||
|
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduledChoreReset() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
ScheduledChore chore = new DoNothingChore("sampleChore", period);
|
||||||
|
|
||||||
|
// TRUE
|
||||||
|
assertTrue(!chore.isInitialChoreComplete());
|
||||||
|
assertTrue(chore.getTimeOfLastRun() == -1);
|
||||||
|
assertTrue(chore.getTimeOfThisRun() == -1);
|
||||||
|
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
Thread.sleep(5 * period);
|
||||||
|
|
||||||
|
// FALSE
|
||||||
|
assertFalse(!chore.isInitialChoreComplete());
|
||||||
|
assertFalse(chore.getTimeOfLastRun() == -1);
|
||||||
|
assertFalse(chore.getTimeOfThisRun() == -1);
|
||||||
|
|
||||||
|
chore.resetState();
|
||||||
|
|
||||||
|
// TRUE
|
||||||
|
assertTrue(!chore.isInitialChoreComplete());
|
||||||
|
assertTrue(chore.getTimeOfLastRun() == -1);
|
||||||
|
assertTrue(chore.getTimeOfThisRun() == -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChangingChoreServices() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
final int sleepTime = 10;
|
||||||
|
ChoreService service1 = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
ChoreService service2 = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
ScheduledChore chore = new DoNothingChore("sample", period);
|
||||||
|
|
||||||
|
assertFalse(chore.isScheduled());
|
||||||
|
assertFalse(service1.isChoreScheduled(chore));
|
||||||
|
assertFalse(service2.isChoreScheduled(chore));
|
||||||
|
assertTrue(chore.getChoreServicer() == null);
|
||||||
|
|
||||||
|
service1.scheduleChore(chore);
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertTrue(chore.isScheduled());
|
||||||
|
assertTrue(service1.isChoreScheduled(chore));
|
||||||
|
assertFalse(service2.isChoreScheduled(chore));
|
||||||
|
assertFalse(chore.getChoreServicer() == null);
|
||||||
|
|
||||||
|
service2.scheduleChore(chore);
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertTrue(chore.isScheduled());
|
||||||
|
assertFalse(service1.isChoreScheduled(chore));
|
||||||
|
assertTrue(service2.isChoreScheduled(chore));
|
||||||
|
assertFalse(chore.getChoreServicer() == null);
|
||||||
|
|
||||||
|
chore.cancel();
|
||||||
|
assertFalse(chore.isScheduled());
|
||||||
|
assertFalse(service1.isChoreScheduled(chore));
|
||||||
|
assertFalse(service2.isChoreScheduled(chore));
|
||||||
|
assertTrue(chore.getChoreServicer() == null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTriggerNowFailsWhenNotScheduled() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
// Small sleep time buffer to allow CountingChore to complete
|
||||||
|
final int sleep = 5;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
CountingChore chore = new CountingChore("dn", period);
|
||||||
|
|
||||||
|
assertFalse(chore.triggerNow());
|
||||||
|
assertTrue(chore.getCountOfChoreCalls() == 0);
|
||||||
|
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
assertEquals(1, chore.getCountOfChoreCalls());
|
||||||
|
Thread.sleep(period);
|
||||||
|
assertEquals(2, chore.getCountOfChoreCalls());
|
||||||
|
assertTrue(chore.triggerNow());
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
assertTrue(chore.triggerNow());
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
assertTrue(chore.triggerNow());
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
assertEquals(5, chore.getCountOfChoreCalls());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStopperForScheduledChores() throws InterruptedException {
|
||||||
|
ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
|
||||||
|
Stoppable stopperForGroup1 = new SampleStopper();
|
||||||
|
Stoppable stopperForGroup2 = new SampleStopper();
|
||||||
|
final int period = 100;
|
||||||
|
final int delta = 10;
|
||||||
|
|
||||||
|
ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
|
||||||
|
ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
|
||||||
|
ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
|
||||||
|
|
||||||
|
ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period);
|
||||||
|
ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period);
|
||||||
|
ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period);
|
||||||
|
|
||||||
|
service.scheduleChore(chore1_group1);
|
||||||
|
service.scheduleChore(chore2_group1);
|
||||||
|
service.scheduleChore(chore3_group1);
|
||||||
|
service.scheduleChore(chore1_group2);
|
||||||
|
service.scheduleChore(chore2_group2);
|
||||||
|
service.scheduleChore(chore3_group2);
|
||||||
|
|
||||||
|
Thread.sleep(delta);
|
||||||
|
Thread.sleep(10 * period);
|
||||||
|
assertTrue(chore1_group1.isScheduled());
|
||||||
|
assertTrue(chore2_group1.isScheduled());
|
||||||
|
assertTrue(chore3_group1.isScheduled());
|
||||||
|
assertTrue(chore1_group2.isScheduled());
|
||||||
|
assertTrue(chore2_group2.isScheduled());
|
||||||
|
assertTrue(chore3_group2.isScheduled());
|
||||||
|
|
||||||
|
stopperForGroup1.stop("test stopping group 1");
|
||||||
|
Thread.sleep(period);
|
||||||
|
assertFalse(chore1_group1.isScheduled());
|
||||||
|
assertFalse(chore2_group1.isScheduled());
|
||||||
|
assertFalse(chore3_group1.isScheduled());
|
||||||
|
assertTrue(chore1_group2.isScheduled());
|
||||||
|
assertTrue(chore2_group2.isScheduled());
|
||||||
|
assertTrue(chore3_group2.isScheduled());
|
||||||
|
|
||||||
|
stopperForGroup2.stop("test stopping group 2");
|
||||||
|
Thread.sleep(period);
|
||||||
|
assertFalse(chore1_group1.isScheduled());
|
||||||
|
assertFalse(chore2_group1.isScheduled());
|
||||||
|
assertFalse(chore3_group1.isScheduled());
|
||||||
|
assertFalse(chore1_group2.isScheduled());
|
||||||
|
assertFalse(chore2_group2.isScheduled());
|
||||||
|
assertFalse(chore3_group2.isScheduled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShutdownCancelsScheduledChores() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
|
||||||
|
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
|
||||||
|
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
|
||||||
|
|
||||||
|
assertTrue(service.scheduleChore(successChore1));
|
||||||
|
assertTrue(successChore1.isScheduled());
|
||||||
|
assertTrue(service.scheduleChore(successChore2));
|
||||||
|
assertTrue(successChore2.isScheduled());
|
||||||
|
assertTrue(service.scheduleChore(successChore3));
|
||||||
|
assertTrue(successChore3.isScheduled());
|
||||||
|
|
||||||
|
service.shutdown();
|
||||||
|
|
||||||
|
assertFalse(successChore1.isScheduled());
|
||||||
|
assertFalse(successChore2.isScheduled());
|
||||||
|
assertFalse(successChore3.isScheduled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
final int sleep = 5 * period;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep);
|
||||||
|
ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep);
|
||||||
|
ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep);
|
||||||
|
|
||||||
|
assertTrue(service.scheduleChore(slowChore1));
|
||||||
|
assertTrue(service.scheduleChore(slowChore2));
|
||||||
|
assertTrue(service.scheduleChore(slowChore3));
|
||||||
|
|
||||||
|
Thread.sleep(sleep / 2);
|
||||||
|
service.shutdown();
|
||||||
|
|
||||||
|
assertFalse(slowChore1.isScheduled());
|
||||||
|
assertFalse(slowChore2.isScheduled());
|
||||||
|
assertFalse(slowChore3.isScheduled());
|
||||||
|
assertTrue(service.isShutdown());
|
||||||
|
|
||||||
|
Thread.sleep(5);
|
||||||
|
assertTrue(service.isTerminated());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShutdownRejectsNewSchedules() throws InterruptedException {
|
||||||
|
final int period = 100;
|
||||||
|
ChoreService service = new ChoreService(TEST_SERVER_NAME);
|
||||||
|
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
|
||||||
|
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
|
||||||
|
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
|
||||||
|
ScheduledChore failChore1 = new DoNothingChore("fc1", period);
|
||||||
|
ScheduledChore failChore2 = new DoNothingChore("fc2", period);
|
||||||
|
ScheduledChore failChore3 = new DoNothingChore("fc3", period);
|
||||||
|
|
||||||
|
assertTrue(service.scheduleChore(successChore1));
|
||||||
|
assertTrue(successChore1.isScheduled());
|
||||||
|
assertTrue(service.scheduleChore(successChore2));
|
||||||
|
assertTrue(successChore2.isScheduled());
|
||||||
|
assertTrue(service.scheduleChore(successChore3));
|
||||||
|
assertTrue(successChore3.isScheduled());
|
||||||
|
|
||||||
|
service.shutdown();
|
||||||
|
|
||||||
|
assertFalse(service.scheduleChore(failChore1));
|
||||||
|
assertFalse(failChore1.isScheduled());
|
||||||
|
assertFalse(service.scheduleChore(failChore2));
|
||||||
|
assertFalse(failChore2.isScheduled());
|
||||||
|
assertFalse(service.scheduleChore(failChore3));
|
||||||
|
assertFalse(failChore3.isScheduled());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue