HBASE-13007 Fix the test timeouts being caused by ChoreService (Jonathan Lawlor); ADDENDUM

This commit is contained in:
stack 2015-02-11 20:46:41 -08:00
parent 4969368bf1
commit aca9aacfb4
3 changed files with 32 additions and 86 deletions

View File

@ -161,7 +161,7 @@ public class ChoreService implements ChoreServicer {
@Override
public synchronized void cancelChore(ScheduledChore chore) {
cancelChore(chore, false);
cancelChore(chore, true);
}
@Override

View File

@ -164,26 +164,19 @@ public abstract class ScheduledChore implements Runnable {
this.timeUnit = unit;
}
synchronized void resetState() {
timeOfLastRun = -1;
timeOfThisRun = -1;
initialChoreComplete = false;
}
/**
* @see java.lang.Thread#run()
*/
@Override
public synchronized void run() {
timeOfLastRun = timeOfThisRun;
timeOfThisRun = System.currentTimeMillis();
public void run() {
updateTimeTrackingBeforeRun();
if (missedStartTime() && isScheduled()) {
choreServicer.onChoreMissedStartTime(this);
onChoreMissedStartTime();
if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
} else if (stopper.isStopped() || choreServicer == null || !isScheduled()) {
cancel();
} else if (stopper.isStopped() || !isScheduled()) {
cancel(false);
cleanup();
LOG.info("Chore: " + getName() + " was stopped");
if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");
} else {
try {
if (!initialChoreComplete) {
@ -192,15 +185,33 @@ public abstract class ScheduledChore implements Runnable {
chore();
}
} catch (Throwable t) {
LOG.error("Caught error", t);
if (LOG.isErrorEnabled()) LOG.error("Caught error", t);
if (this.stopper.isStopped()) {
cancel();
cancel(false);
cleanup();
}
}
}
}
/**
* Update our time tracking members. Called at the start of an execution of this chore's run()
* method so that a correct decision can be made as to whether or not we missed the start time
*/
private synchronized void updateTimeTrackingBeforeRun() {
timeOfLastRun = timeOfThisRun;
timeOfThisRun = System.currentTimeMillis();
}
/**
* Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
* make the decision as to whether or not it would be worthwhile to increase the number of core
* pool threads
*/
private synchronized void onChoreMissedStartTime() {
if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
}
/**
* @return How long has it been since this chore last run. Useful for checking if the chore has
* missed its scheduled start time by too large of a margin
@ -248,7 +259,7 @@ public abstract class ScheduledChore implements Runnable {
}
public synchronized void cancel() {
cancel(false);
cancel(true);
}
public synchronized void cancel(boolean mayInterruptIfRunning) {
@ -317,7 +328,7 @@ public abstract class ScheduledChore implements Runnable {
* Override to run a task before we start looping.
* @return true if initial chore was successful
*/
protected synchronized boolean initialChore() {
protected boolean initialChore() {
// Default does nothing
return true;
}

View File

@ -423,7 +423,7 @@ public class TestChoreService {
shutdownService(service);
}
@Test (timeout=20000)
@Test(timeout = 30000)
public void testCorePoolDecrease() throws InterruptedException {
final int initialCorePoolSize = 3;
ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize);
@ -456,6 +456,8 @@ public class TestChoreService {
service.getNumberOfScheduledChores(), service.getCorePoolSize());
assertEquals(service.getNumberOfChoresMissingStartTime(), 5);
// Now we begin to cancel the chores that caused an increase in the core thread pool of the
// ChoreService. These cancellations should cause a decrease in the core thread pool.
slowChore5.cancel();
Thread.sleep(chorePeriod * 10);
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
@ -486,44 +488,6 @@ public class TestChoreService {
service.getCorePoolSize());
assertEquals(service.getNumberOfChoresMissingStartTime(), 0);
slowChore1.resetState();
service.scheduleChore(slowChore1);
Thread.sleep(chorePeriod * 10);
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
service.getCorePoolSize());
assertEquals(service.getNumberOfChoresMissingStartTime(), 1);
slowChore2.resetState();
service.scheduleChore(slowChore2);
Thread.sleep(chorePeriod * 10);
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
service.getCorePoolSize());
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod);
service.scheduleChore(fastChore1);
Thread.sleep(chorePeriod * 10);
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
assertEquals("Should increase", 3, service.getCorePoolSize());
DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod);
service.scheduleChore(fastChore2);
Thread.sleep(chorePeriod * 10);
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
assertEquals("Should increase", 3, service.getCorePoolSize());
DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod);
service.scheduleChore(fastChore3);
Thread.sleep(chorePeriod * 10);
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
assertEquals("Should not change", 3, service.getCorePoolSize());
DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod);
service.scheduleChore(fastChore4);
Thread.sleep(chorePeriod * 10);
assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
assertEquals("Should not change", 3, service.getCorePoolSize());
shutdownService(service);
}
@ -657,35 +621,6 @@ public class TestChoreService {
shutdownService(service);
}
@Test (timeout=20000)
public void testScheduledChoreReset() throws InterruptedException {
final int period = 100;
ChoreService service = new ChoreService("testScheduledChoreReset");
ScheduledChore chore = new DoNothingChore("sampleChore", period);
// TRUE
assertTrue(!chore.isInitialChoreComplete());
assertTrue(chore.getTimeOfLastRun() == -1);
assertTrue(chore.getTimeOfThisRun() == -1);
service.scheduleChore(chore);
Thread.sleep(5 * period);
// FALSE
assertFalse(!chore.isInitialChoreComplete());
assertFalse(chore.getTimeOfLastRun() == -1);
assertFalse(chore.getTimeOfThisRun() == -1);
chore.resetState();
// TRUE
assertTrue(!chore.isInitialChoreComplete());
assertTrue(chore.getTimeOfLastRun() == -1);
assertTrue(chore.getTimeOfThisRun() == -1);
shutdownService(service);
}
@Test (timeout=20000)
public void testChangingChoreServices() throws InterruptedException {
final int period = 100;