From 559a1f0a28c5a4e05773b0a652241e056281201e Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 20 Jan 2021 16:10:36 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-25509=20ChoreService.cancelChore=20will?= =?UTF-8?q?=20not=20call=20ScheduledChore.cle=E2=80=A6=20(#2890)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viraj Jasani --- .../org/apache/hadoop/hbase/ChoreService.java | 135 ++- .../apache/hadoop/hbase/ScheduledChore.java | 103 +- .../apache/hadoop/hbase/TestChoreService.java | 1050 ++++++++--------- .../apache/hadoop/hbase/master/HMaster.java | 38 +- .../hbase/master/RegionsRecoveryChore.java | 22 - .../master/RegionsRecoveryConfigManager.java | 45 +- .../hadoop/hbase/master/SplitLogManager.java | 2 +- .../hadoop/hbase/quotas/QuotaCache.java | 2 +- .../quotas/RegionServerSpaceQuotaManager.java | 4 +- .../hbase/regionserver/HRegionServer.java | 23 +- .../hbase/regionserver/HeapMemoryManager.java | 2 +- .../TestRegionsRecoveryConfigManager.java | 58 +- .../master/janitor/TestCatalogJanitor.java | 2 +- 13 files changed, 703 insertions(+), 783 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java index 39c3ccc6919..5bd67ad02ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase; +import com.google.errorprone.annotations.RestrictedApi; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -26,8 +27,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory; * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. */ @InterfaceAudience.Public -public class ChoreService implements ChoreServicer { +public class ChoreService { private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class); /** @@ -141,28 +140,39 @@ public class ChoreService implements ChoreServicer { * @return true when the chore was successfully scheduled. false when the scheduling failed * (typically occurs when a chore is scheduled during shutdown of service) */ - public synchronized boolean scheduleChore(ScheduledChore chore) { + public boolean scheduleChore(ScheduledChore chore) { if (chore == null) { return false; } - - try { - if (chore.getPeriod() <= 0) { - LOG.info("Chore {} is disabled because its period is not positive.", chore); - return false; + // always lock chore first to prevent dead lock + synchronized (chore) { + synchronized (this) { + try { + // Chores should only ever be scheduled with a single ChoreService. If the choreService + // is changing, cancel any existing schedules of this chore. + if (chore.getChoreService() == this) { + LOG.warn("Chore {} has already been scheduled with us", chore); + return false; + } + if (chore.getPeriod() <= 0) { + LOG.info("Chore {} is disabled because its period is not positive.", chore); + return false; + } + LOG.info("Chore {} is enabled.", chore); + if (chore.getChoreService() != null) { + LOG.info("Cancel chore {} from its previous service", chore); + chore.getChoreService().cancelChore(chore); + } + chore.setChoreService(this); + ScheduledFuture future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), + chore.getPeriod(), chore.getTimeUnit()); + scheduledChores.put(chore, future); + return true; + } catch (Exception e) { + LOG.error("Could not successfully schedule chore: {}", chore.getName(), e); + return false; + } } - LOG.info("Chore {} is enabled.", chore); - chore.setChoreServicer(this); - ScheduledFuture future = - scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(), - chore.getTimeUnit()); - scheduledChores.put(chore, future); - return true; - } catch (Exception exception) { - if (LOG.isInfoEnabled()) { - LOG.info("Could not successfully schedule chore: " + chore.getName()); - } - return false; } } @@ -175,19 +185,35 @@ public class ChoreService implements ChoreServicer { ScheduledFuture future = scheduledChores.get(chore); future.cancel(false); } - scheduleChore(chore); + ScheduledFuture future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), + chore.getPeriod(), chore.getTimeUnit()); + scheduledChores.put(chore, future); } - @InterfaceAudience.Private - @Override - public synchronized void cancelChore(ScheduledChore chore) { + /** + * Cancel any ongoing schedules that this chore has with the implementer of this interface. + *

+ * Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in + * {@link ScheduledChore#cancel()} method we will call this method to remove the + * {@link ScheduledChore} from this {@link ChoreService}. + */ + @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", + allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java") + synchronized void cancelChore(ScheduledChore chore) { cancelChore(chore, true); } - @InterfaceAudience.Private - @Override - public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { - if (chore != null && scheduledChores.containsKey(chore)) { + /** + * Cancel any ongoing schedules that this chore has with the implementer of this interface. + *

+ * Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in + * {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the + * {@link ScheduledChore} from this {@link ChoreService}. + */ + @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", + allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java") + synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { + if (scheduledChores.containsKey(chore)) { ScheduledFuture future = scheduledChores.get(chore); future.cancel(mayInterruptIfRunning); scheduledChores.remove(chore); @@ -201,21 +227,24 @@ public class ChoreService implements ChoreServicer { } } + /** + * @return true when the chore is scheduled with the implementer of this interface + */ @InterfaceAudience.Private - @Override public synchronized boolean isChoreScheduled(ScheduledChore chore) { return chore != null && scheduledChores.containsKey(chore) && !scheduledChores.get(chore).isDone(); } - @InterfaceAudience.Private - @Override - public synchronized boolean triggerNow(ScheduledChore chore) { - if (chore != null) { - rescheduleChore(chore); - return true; - } - return false; + /** + * This method tries to execute the chore immediately. If the chore is executing at the time of + * this call, the chore will begin another execution as soon as the current execution finishes + */ + @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", + allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") + synchronized void triggerNow(ScheduledChore chore) { + assert chore.getChoreService() == this; + rescheduleChore(chore); } /** @@ -295,10 +324,20 @@ public class ChoreService implements ChoreServicer { } } - @InterfaceAudience.Private - @Override - public synchronized void onChoreMissedStartTime(ScheduledChore chore) { - if (chore == null || !scheduledChores.containsKey(chore)) return; + /** + * A callback that tells the implementer of this interface that one of the scheduled chores is + * missing its start time. The implication of a chore missing its start time is that the service's + * current means of scheduling may not be sufficient to handle the number of ongoing chores (the + * other explanation is that the chore's execution time is greater than its scheduled period). The + * service should try to increase its concurrency when this callback is received. + * @param chore The chore that missed its start time + */ + @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", + allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") + synchronized void onChoreMissedStartTime(ScheduledChore chore) { + if (!scheduledChores.containsKey(chore)) { + return; + } // If the chore has not caused an increase in the size of the core thread pool then request an // increase. This allows each chore missing its start time to increase the core pool size by @@ -319,13 +358,17 @@ public class ChoreService implements ChoreServicer { * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores * in the middle of execution will be interrupted and shutdown. This service will be unusable * after this method has been called (i.e. future scheduling attempts will fail). + *

+ * Notice that, this will only clean the chore from this ChoreService but you could still schedule + * the chore with other ChoreService. */ public synchronized void shutdown() { - scheduler.shutdownNow(); - if (LOG.isInfoEnabled()) { - LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet() - + " on shutdown"); + if (isShutdown()) { + return; } + scheduler.shutdownNow(); + LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix, + scheduledChores.keySet()); cancelAllChores(true); scheduledChores.clear(); choresMissingStartTime.clear(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java index 1fb5b7e9e34..6155bbdeb3b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase; +import com.google.errorprone.annotations.RestrictedApi; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.yetus.audience.InterfaceAudience; @@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory; * execute within the defined period. It is bad practice to define a ScheduledChore whose execution * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s * thread pool. - *

+ *

* Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as * an entry being added to a queue, etc. */ @@ -60,7 +61,7 @@ public abstract class ScheduledChore implements Runnable { * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is * not scheduled. */ - private ChoreServicer choreServicer; + private ChoreService choreService; /** * Variables that encapsulate the meaningful state information @@ -77,39 +78,6 @@ public abstract class ScheduledChore implements Runnable { */ private final Stoppable stopper; - interface ChoreServicer { - /** - * Cancel any ongoing schedules that this chore has with the implementer of this interface. - */ - public void cancelChore(ScheduledChore chore); - public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); - - /** - * @return true when the chore is scheduled with the implementer of this interface - */ - public boolean isChoreScheduled(ScheduledChore chore); - - /** - * This method tries to execute the chore immediately. If the chore is executing at the time of - * this call, the chore will begin another execution as soon as the current execution finishes - *

- * If the chore is not scheduled with a ChoreService, this call will fail. - * @return false when the chore could not be triggered immediately - */ - public boolean triggerNow(ScheduledChore chore); - - /** - * A callback that tells the implementer of this interface that one of the scheduled chores is - * missing its start time. The implication of a chore missing its start time is that the - * service's current means of scheduling may not be sufficient to handle the number of ongoing - * chores (the other explanation is that the chore's execution time is greater than its - * scheduled period). The service should try to increase its concurrency when this callback is - * received. - * @param chore The chore that missed its start time - */ - public void onChoreMissedStartTime(ScheduledChore chore); - } - /** * This constructor is for test only. It allows us to create an object and to call chore() on it. */ @@ -168,8 +136,8 @@ public abstract class ScheduledChore implements Runnable { onChoreMissedStartTime(); LOG.info("Chore: {} missed its start time", getName()); } else if (stopper.isStopped() || !isScheduled()) { - cancel(false); - cleanup(); + // call shutdown here to cleanup the ScheduledChore. + shutdown(false); LOG.info("Chore: {} was stopped", getName()); } else { try { @@ -193,7 +161,6 @@ public abstract class ScheduledChore implements Runnable { LOG.error("Caught error", t); if (this.stopper.isStopped()) { cancel(false); - cleanup(); } } } @@ -214,7 +181,9 @@ public abstract class ScheduledChore implements Runnable { * pool threads */ private synchronized void onChoreMissedStartTime() { - if (choreServicer != null) choreServicer.onChoreMissedStartTime(this); + if (choreService != null) { + choreService.onChoreMissedStartTime(this); + } } /** @@ -253,20 +222,17 @@ public abstract class ScheduledChore implements Runnable { * @return false when the Chore is not currently scheduled with a ChoreService */ public synchronized boolean triggerNow() { - if (choreServicer != null) { - return choreServicer.triggerNow(this); - } else { + if (choreService == null) { return false; } + choreService.triggerNow(this); + return true; } - synchronized void setChoreServicer(ChoreServicer service) { - // Chores should only ever be scheduled with a single ChoreService. If the choreServicer - // is changing, cancel any existing schedules of this chore. - if (choreServicer != null && choreServicer != service) { - choreServicer.cancelChore(this, false); - } - choreServicer = service; + @RestrictedApi(explanation = "Should only be called in ChoreService", link = "", + allowedOnPath = ".*/org/apache/hadoop/hbase/ChoreService.java") + synchronized void setChoreService(ChoreService service) { + choreService = service; timeOfThisRun = -1; } @@ -275,9 +241,10 @@ public abstract class ScheduledChore implements Runnable { } public synchronized void cancel(boolean mayInterruptIfRunning) { - if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning); - - choreServicer = null; + if (isScheduled()) { + choreService.cancelChore(this, mayInterruptIfRunning); + } + choreService = null; } public String getName() { @@ -310,17 +277,14 @@ public abstract class ScheduledChore implements Runnable { return initialChoreComplete; } - @InterfaceAudience.Private - synchronized ChoreServicer getChoreServicer() { - return choreServicer; + synchronized ChoreService getChoreService() { + return choreService; } - @InterfaceAudience.Private synchronized long getTimeOfLastRun() { return timeOfLastRun; } - @InterfaceAudience.Private synchronized long getTimeOfThisRun() { return timeOfThisRun; } @@ -329,10 +293,12 @@ public abstract class ScheduledChore implements Runnable { * @return true when this Chore is scheduled with a ChoreService */ public synchronized boolean isScheduled() { - return choreServicer != null && choreServicer.isChoreScheduled(this); + return choreService != null && choreService.isChoreScheduled(this); } @InterfaceAudience.Private + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") public synchronized void choreForTesting() { chore(); } @@ -354,7 +320,26 @@ public abstract class ScheduledChore implements Runnable { /** * Override to run cleanup tasks when the Chore encounters an error and must stop running */ - protected synchronized void cleanup() { + protected void cleanup() { + } + + /** + * Call {@link #shutdown(boolean)} with {@code true}. + * @see ScheduledChore#shutdown(boolean) + */ + public synchronized void shutdown() { + shutdown(true); + } + + /** + * Completely shutdown the ScheduleChore, which means we will call cleanup and you should not + * schedule it again. + *

+ * This is another path to cleanup the chore, comparing to stop the stopper instance passed in. + */ + public synchronized void shutdown(boolean mayInterruptIfRunning) { + cancel(mayInterruptIfRunning); + cleanup(); } /** diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java index 69a171c205f..64a076a6063 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -20,16 +20,18 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore; -import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore; -import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore; -import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper; -import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore; -import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -38,261 +40,234 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category(MediumTests.class) +@Category({ MiscTests.class, MediumTests.class }) public class TestChoreService { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestChoreService.class); + HBaseClassTestRule.forClass(TestChoreService.class); - public static final Logger log = LoggerFactory.getLogger(TestChoreService.class); + private static final Logger LOG = LoggerFactory.getLogger(TestChoreService.class); + + private static final Configuration CONF = HBaseConfiguration.create(); @Rule public TestName name = new TestName(); + private int initialCorePoolSize = 3; + + private ChoreService service; + + @Before + public void setUp() { + service = new ChoreService(name.getMethodName(), initialCorePoolSize, false); + } + + @After + public void tearDown() { + shutdownService(service); + } + /** - * A few ScheduledChore samples that are useful for testing with ChoreService + * Straight forward stopper implementation that is used by default when one is not provided */ - public static class ScheduledChoreSamples { - /** - * Straight forward stopper implementation that is used by default when one is not provided - */ - public static class SampleStopper implements Stoppable { - private boolean stopped = false; + private static class SampleStopper implements Stoppable { + private boolean stopped = false; - @Override - public void stop(String why) { - stopped = true; + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + } + + /** + * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic + * executions + */ + private static class SlowChore extends ScheduledChore { + public SlowChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public SlowChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected boolean initialChore() { + Threads.sleep(getPeriod() * 2); + return true; + } + + @Override + protected void chore() { + Threads.sleep(getPeriod() * 2); + } + } + + /** + * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests + */ + private static class DoNothingChore extends ScheduledChore { + + public DoNothingChore(String name, int period) { + super(name, new SampleStopper(), period); + } + + public DoNothingChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected void chore() { + // DO NOTHING + } + } + + private static class SleepingChore extends ScheduledChore { + private int sleepTime; + + public SleepingChore(String name, int chorePeriod, int sleepTime) { + this(name, new SampleStopper(), chorePeriod, sleepTime); + } + + public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { + super(name, stopper, period); + this.sleepTime = sleepTime; + } + + @Override + protected boolean initialChore() { + Threads.sleep(sleepTime); + return true; + } + + @Override + protected void chore() { + Threads.sleep(sleepTime); + } + } + + private static class CountingChore extends ScheduledChore { + private int countOfChoreCalls; + private boolean outputOnTicks = false; + + public CountingChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public CountingChore(String name, Stoppable stopper, int period) { + this(name, stopper, period, false); + } + + public CountingChore(String name, Stoppable stopper, int period, final boolean outputOnTicks) { + super(name, stopper, period); + this.countOfChoreCalls = 0; + this.outputOnTicks = outputOnTicks; + } + + @Override + protected boolean initialChore() { + countOfChoreCalls++; + if (outputOnTicks) { + outputTickCount(); } + return true; + } - @Override - public boolean isStopped() { - return stopped; + @Override + protected void chore() { + countOfChoreCalls++; + if (outputOnTicks) { + outputTickCount(); } } + private void outputTickCount() { + LOG.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); + } + + public int getCountOfChoreCalls() { + return countOfChoreCalls; + } + } + + /** + * A Chore that will try to execute the initial chore a few times before succeeding. Once the + * initial chore is complete the chore cancels itself + */ + public static class FailInitialChore extends ScheduledChore { + private int numberOfFailures; + private int failureThreshold; + /** - * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic - * executions + * @param failThreshold Number of times the Chore fails when trying to execute initialChore + * before succeeding. */ - public static class SlowChore extends ScheduledChore { - public SlowChore(String name, int period) { - this(name, new SampleStopper(), period); - } + public FailInitialChore(String name, int period, int failThreshold) { + this(name, new SampleStopper(), period, failThreshold); + } - public SlowChore(String name, Stoppable stopper, int period) { - super(name, stopper, period); - } + public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { + super(name, stopper, period); + numberOfFailures = 0; + failureThreshold = failThreshold; + } - @Override - protected boolean initialChore() { - try { - Thread.sleep(getPeriod() * 2); - } catch (InterruptedException e) { - log.warn("", e); - } + @Override + protected boolean initialChore() { + if (numberOfFailures < failureThreshold) { + numberOfFailures++; + return false; + } else { return true; } - - @Override - protected void chore() { - try { - Thread.sleep(getPeriod() * 2); - } catch (InterruptedException e) { - log.warn("", e); - } - } } - /** - * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests - */ - public static class DoNothingChore extends ScheduledChore { - public DoNothingChore(String name, int period) { - super(name, new SampleStopper(), period); - } - - public DoNothingChore(String name, Stoppable stopper, int period) { - super(name, stopper, period); - } - - @Override - protected void chore() { - // DO NOTHING - } - - } - - public static class SleepingChore extends ScheduledChore { - private int sleepTime; - - public SleepingChore(String name, int chorePeriod, int sleepTime) { - this(name, new SampleStopper(), chorePeriod, sleepTime); - } - - public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { - super(name, stopper, period); - this.sleepTime = sleepTime; - } - - @Override - protected boolean initialChore() { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - log.warn("", e); - } - return true; - } - - @Override - protected void chore() { - try { - Thread.sleep(sleepTime); - } catch (Exception e) { - log.warn("", e); - } - } - } - - public static class CountingChore extends ScheduledChore { - private int countOfChoreCalls; - private boolean outputOnTicks = false; - - public CountingChore(String name, int period) { - this(name, new SampleStopper(), period); - } - - public CountingChore(String name, Stoppable stopper, int period) { - this(name, stopper, period, false); - } - - public CountingChore(String name, Stoppable stopper, int period, - final boolean outputOnTicks) { - super(name, stopper, period); - this.countOfChoreCalls = 0; - this.outputOnTicks = outputOnTicks; - } - - @Override - protected boolean initialChore() { - countOfChoreCalls++; - if (outputOnTicks) { - outputTickCount(); - } - return true; - } - - @Override - protected void chore() { - countOfChoreCalls++; - if (outputOnTicks) { - outputTickCount(); - } - } - - private void outputTickCount() { - log.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); - } - - public int getCountOfChoreCalls() { - return countOfChoreCalls; - } - - public boolean isOutputtingOnTicks() { - return outputOnTicks; - } - - public void setOutputOnTicks(boolean o) { - outputOnTicks = o; - } - } - - /** - * A Chore that will try to execute the initial chore a few times before succeeding. Once the - * initial chore is complete the chore cancels itself - */ - public static class FailInitialChore extends ScheduledChore { - private int numberOfFailures; - private int failureThreshold; - - /** - * @param failThreshold Number of times the Chore fails when trying to execute initialChore - * before succeeding. - */ - public FailInitialChore(String name, int period, int failThreshold) { - this(name, new SampleStopper(), period, failThreshold); - } - - public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { - super(name, stopper, period); - numberOfFailures = 0; - failureThreshold = failThreshold; - } - - @Override - protected boolean initialChore() { - if (numberOfFailures < failureThreshold) { - numberOfFailures++; - return false; - } else { - return true; - } - } - - @Override - protected void chore() { - assertTrue(numberOfFailures == failureThreshold); - cancel(false); - } - + @Override + protected void chore() { + assertTrue(numberOfFailures == failureThreshold); + cancel(false); } } @Test public void testInitialChorePrecedence() throws InterruptedException { - ChoreService service = new ChoreService("testInitialChorePrecedence"); - final int period = 100; final int failureThreshold = 5; + ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); + service.scheduleChore(chore); - try { - ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); - service.scheduleChore(chore); + int loopCount = 0; + boolean brokeOutOfLoop = false; - int loopCount = 0; - boolean brokeOutOfLoop = false; - - while (!chore.isInitialChoreComplete() && chore.isScheduled()) { - Thread.sleep(failureThreshold * period); - loopCount++; - if (loopCount > 3) { - brokeOutOfLoop = true; - break; - } + while (!chore.isInitialChoreComplete() && chore.isScheduled()) { + Thread.sleep(failureThreshold * period); + loopCount++; + if (loopCount > 3) { + brokeOutOfLoop = true; + break; } - - assertFalse(brokeOutOfLoop); - } finally { - shutdownService(service); } + + assertFalse(brokeOutOfLoop); } @Test public void testCancelChore() throws InterruptedException { final int period = 100; - ScheduledChore chore1 = new DoNothingChore("chore1", period); - ChoreService service = new ChoreService("testCancelChore"); - try { - service.scheduleChore(chore1); - assertTrue(chore1.isScheduled()); + ScheduledChore chore = new DoNothingChore("chore", period); + service.scheduleChore(chore); + assertTrue(chore.isScheduled()); - chore1.cancel(true); - assertFalse(chore1.isScheduled()); - assertTrue(service.getNumberOfScheduledChores() == 0); - } finally { - shutdownService(service); - } + chore.cancel(true); + assertFalse(chore.isScheduled()); + assertTrue(service.getNumberOfScheduledChores() == 0); } @Test @@ -304,12 +279,12 @@ public class TestChoreService { final TimeUnit UNIT = TimeUnit.NANOSECONDS; ScheduledChore chore1 = - new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { - @Override - protected void chore() { - // DO NOTHING - } - }; + new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; assertEquals("Name construction failed", NAME, chore1.getName()); assertEquals("Period construction failed", PERIOD, chore1.getPeriod()); @@ -317,12 +292,12 @@ public class TestChoreService { assertEquals("TimeUnit construction failed", UNIT, chore1.getTimeUnit()); ScheduledChore invalidDelayChore = - new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { - @Override - protected void chore() { - // DO NOTHING - } - }; + new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; assertEquals("Initial Delay should be set to 0 when invalid", 0, invalidDelayChore.getInitialDelay()); @@ -334,7 +309,7 @@ public class TestChoreService { final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; ChoreService customInit = - new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); + new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); try { assertEquals(corePoolSize, customInit.getCorePoolSize()); } finally { @@ -360,258 +335,218 @@ public class TestChoreService { public void testFrequencyOfChores() throws InterruptedException { final int period = 100; // Small delta that acts as time buffer (allowing chores to complete if running slowly) - final int delta = period/5; - ChoreService service = new ChoreService("testFrequencyOfChores"); + final int delta = period / 5; CountingChore chore = new CountingChore("countingChore", period); - try { - service.scheduleChore(chore); + service.scheduleChore(chore); - Thread.sleep(10 * period + delta); - assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls()); + Thread.sleep(10 * period + delta); + assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls()); - Thread.sleep(10 * period + delta); - assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls()); - } finally { - shutdownService(service); - } + Thread.sleep(10 * period + delta); + assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls()); } - public void shutdownService(ChoreService service) throws InterruptedException { + public void shutdownService(ChoreService service) { service.shutdown(); - while (!service.isTerminated()) { - Thread.sleep(100); - } + Waiter.waitFor(CONF, 1000, () -> service.isTerminated()); } @Test public void testForceTrigger() throws InterruptedException { final int period = 100; - final int delta = period/10; - ChoreService service = new ChoreService("testForceTrigger"); + final int delta = period / 10; final CountingChore chore = new CountingChore("countingChore", period); - try { - service.scheduleChore(chore); - Thread.sleep(10 * period + delta); + service.scheduleChore(chore); + Thread.sleep(10 * period + delta); - assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls()); + assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls()); - // Force five runs of the chore to occur, sleeping between triggers to ensure the - // chore has time to run - chore.triggerNow(); - Thread.sleep(delta); - chore.triggerNow(); - Thread.sleep(delta); - chore.triggerNow(); - Thread.sleep(delta); - chore.triggerNow(); - Thread.sleep(delta); - chore.triggerNow(); - Thread.sleep(delta); + // Force five runs of the chore to occur, sleeping between triggers to ensure the + // chore has time to run + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); - assertEquals("Trigger was called 5 times after 10 periods.", 16, - chore.getCountOfChoreCalls()); + assertEquals("Trigger was called 5 times after 10 periods.", 16, chore.getCountOfChoreCalls()); - Thread.sleep(10 * period + delta); + Thread.sleep(10 * period + delta); - // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing. - assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(), - chore.getCountOfChoreCalls() > 16); - } finally { - shutdownService(service); - } + // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing. + assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(), + chore.getCountOfChoreCalls() > 16); } @Test public void testCorePoolIncrease() throws InterruptedException { - final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false); + assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize, + service.getCorePoolSize()); - try { - assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize, - service.getCorePoolSize()); + final int slowChorePeriod = 100; + SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); - final int slowChorePeriod = 100; - SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); - SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); - SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); - service.scheduleChore(slowChore1); - service.scheduleChore(slowChore2); - service.scheduleChore(slowChore3); + Thread.sleep(slowChorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", 3, + service.getCorePoolSize()); - Thread.sleep(slowChorePeriod * 10); - assertEquals("Should not create more pools than scheduled chores", 3, - service.getCorePoolSize()); + SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); + service.scheduleChore(slowChore4); - SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); - service.scheduleChore(slowChore4); + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 4, + service.getCorePoolSize()); - Thread.sleep(slowChorePeriod * 10); - assertEquals("Chores are missing their start time. Should expand core pool size", 4, - service.getCorePoolSize()); + SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); + service.scheduleChore(slowChore5); - SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); - service.scheduleChore(slowChore5); - - Thread.sleep(slowChorePeriod * 10); - assertEquals("Chores are missing their start time. Should expand core pool size", 5, - service.getCorePoolSize()); - } finally { - shutdownService(service); - } + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 5, + service.getCorePoolSize()); } @Test public void testCorePoolDecrease() throws InterruptedException { - final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false); final int chorePeriod = 100; - try { - // Slow chores always miss their start time and thus the core pool size should be at least as - // large as the number of running slow chores - SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); - SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); - SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); + // Slow chores always miss their start time and thus the core pool size should be at least as + // large as the number of running slow chores + SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); - service.scheduleChore(slowChore1); - service.scheduleChore(slowChore2); - service.scheduleChore(slowChore3); + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); - Thread.sleep(chorePeriod * 10); - assertEquals("Should not create more pools than scheduled chores", - service.getNumberOfScheduledChores(), service.getCorePoolSize()); + Thread.sleep(chorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); - SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); - service.scheduleChore(slowChore4); - Thread.sleep(chorePeriod * 10); - assertEquals("Chores are missing their start time. Should expand core pool size", - service.getNumberOfScheduledChores(), service.getCorePoolSize()); + SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); + service.scheduleChore(slowChore4); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); - SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); - service.scheduleChore(slowChore5); - Thread.sleep(chorePeriod * 10); - assertEquals("Chores are missing their start time. Should expand core pool size", - service.getNumberOfScheduledChores(), service.getCorePoolSize()); - assertEquals(5, service.getNumberOfChoresMissingStartTime()); + SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); + service.scheduleChore(slowChore5); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + assertEquals(5, service.getNumberOfChoresMissingStartTime()); - // Now we begin to cancel the chores that caused an increase in the core thread pool of the - // ChoreService. These cancellations should cause a decrease in the core thread pool. - slowChore5.cancel(); - Thread.sleep(chorePeriod * 10); - assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), - service.getCorePoolSize()); - assertEquals(4, service.getNumberOfChoresMissingStartTime()); + // Now we begin to cancel the chores that caused an increase in the core thread pool of the + // ChoreService. These cancellations should cause a decrease in the core thread pool. + slowChore5.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(4, service.getNumberOfChoresMissingStartTime()); - slowChore4.cancel(); - Thread.sleep(chorePeriod * 10); - assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), - service.getCorePoolSize()); - assertEquals(3, service.getNumberOfChoresMissingStartTime()); + slowChore4.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(3, service.getNumberOfChoresMissingStartTime()); - slowChore3.cancel(); - Thread.sleep(chorePeriod * 10); - assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), - service.getCorePoolSize()); - assertEquals(2, service.getNumberOfChoresMissingStartTime()); + slowChore3.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(2, service.getNumberOfChoresMissingStartTime()); - slowChore2.cancel(); - Thread.sleep(chorePeriod * 10); - assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), - service.getCorePoolSize()); - assertEquals(1, service.getNumberOfChoresMissingStartTime()); + slowChore2.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(1, service.getNumberOfChoresMissingStartTime()); - slowChore1.cancel(); - Thread.sleep(chorePeriod * 10); - assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), - service.getCorePoolSize()); - assertEquals(0, service.getNumberOfChoresMissingStartTime()); - } finally { - shutdownService(service); - } + slowChore1.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(0, service.getNumberOfChoresMissingStartTime()); } @Test public void testNumberOfRunningChores() throws InterruptedException { - ChoreService service = new ChoreService("testNumberOfRunningChores"); - final int period = 100; final int sleepTime = 5; + DoNothingChore dn1 = new DoNothingChore("dn1", period); + DoNothingChore dn2 = new DoNothingChore("dn2", period); + DoNothingChore dn3 = new DoNothingChore("dn3", period); + DoNothingChore dn4 = new DoNothingChore("dn4", period); + DoNothingChore dn5 = new DoNothingChore("dn5", period); - try { - DoNothingChore dn1 = new DoNothingChore("dn1", period); - DoNothingChore dn2 = new DoNothingChore("dn2", period); - DoNothingChore dn3 = new DoNothingChore("dn3", period); - DoNothingChore dn4 = new DoNothingChore("dn4", period); - DoNothingChore dn5 = new DoNothingChore("dn5", period); + service.scheduleChore(dn1); + service.scheduleChore(dn2); + service.scheduleChore(dn3); + service.scheduleChore(dn4); + service.scheduleChore(dn5); - service.scheduleChore(dn1); - service.scheduleChore(dn2); - service.scheduleChore(dn3); - service.scheduleChore(dn4); - service.scheduleChore(dn5); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores()); - Thread.sleep(sleepTime); - assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores()); + dn1.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores()); - dn1.cancel(); - Thread.sleep(sleepTime); - assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores()); + dn2.cancel(); + dn3.cancel(); + dn4.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores()); - dn2.cancel(); - dn3.cancel(); - dn4.cancel(); - Thread.sleep(sleepTime); - assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores()); - - dn5.cancel(); - Thread.sleep(sleepTime); - assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores()); - } finally { - shutdownService(service); - } + dn5.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores()); } @Test public void testNumberOfChoresMissingStartTime() throws InterruptedException { - ChoreService service = new ChoreService("testNumberOfChoresMissingStartTime"); - final int period = 100; final int sleepTime = 20 * period; + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); - try { - // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores - // ALWAYS miss their start time since their execution takes longer than their period - SlowChore sc1 = new SlowChore("sc1", period); - SlowChore sc2 = new SlowChore("sc2", period); - SlowChore sc3 = new SlowChore("sc3", period); - SlowChore sc4 = new SlowChore("sc4", period); - SlowChore sc5 = new SlowChore("sc5", period); + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); - service.scheduleChore(sc1); - service.scheduleChore(sc2); - service.scheduleChore(sc3); - service.scheduleChore(sc4); - service.scheduleChore(sc5); + Thread.sleep(sleepTime); + assertEquals(5, service.getNumberOfChoresMissingStartTime()); - Thread.sleep(sleepTime); - assertEquals(5, service.getNumberOfChoresMissingStartTime()); + sc1.cancel(); + Thread.sleep(sleepTime); + assertEquals(4, service.getNumberOfChoresMissingStartTime()); - sc1.cancel(); - Thread.sleep(sleepTime); - assertEquals(4, service.getNumberOfChoresMissingStartTime()); + sc2.cancel(); + sc3.cancel(); + sc4.cancel(); + Thread.sleep(sleepTime); + assertEquals(1, service.getNumberOfChoresMissingStartTime()); - sc2.cancel(); - sc3.cancel(); - sc4.cancel(); - Thread.sleep(sleepTime); - assertEquals(1, service.getNumberOfChoresMissingStartTime()); - - sc5.cancel(); - Thread.sleep(sleepTime); - assertEquals(0, service.getNumberOfChoresMissingStartTime()); - } finally { - shutdownService(service); - } + sc5.cancel(); + Thread.sleep(sleepTime); + assertEquals(0, service.getNumberOfChoresMissingStartTime()); } /** @@ -621,163 +556,145 @@ public class TestChoreService { */ @Test public void testMaximumChoreServiceThreads() throws InterruptedException { - ChoreService service = new ChoreService("testMaximumChoreServiceThreads"); final int period = 100; final int sleepTime = 5 * period; + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period. + // Chores that miss their start time will trigger the onChoreMissedStartTime callback + // in the ChoreService. This callback will try to increase the number of core pool + // threads. + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); - try { - // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores - // ALWAYS miss their start time since their execution takes longer than their period. - // Chores that miss their start time will trigger the onChoreMissedStartTime callback - // in the ChoreService. This callback will try to increase the number of core pool - // threads. - SlowChore sc1 = new SlowChore("sc1", period); - SlowChore sc2 = new SlowChore("sc2", period); - SlowChore sc3 = new SlowChore("sc3", period); - SlowChore sc4 = new SlowChore("sc4", period); - SlowChore sc5 = new SlowChore("sc5", period); + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); - service.scheduleChore(sc1); - service.scheduleChore(sc2); - service.scheduleChore(sc3); - service.scheduleChore(sc4); - service.scheduleChore(sc5); + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); - Thread.sleep(sleepTime); - assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + SlowChore sc6 = new SlowChore("sc6", period); + SlowChore sc7 = new SlowChore("sc7", period); + SlowChore sc8 = new SlowChore("sc8", period); + SlowChore sc9 = new SlowChore("sc9", period); + SlowChore sc10 = new SlowChore("sc10", period); - SlowChore sc6 = new SlowChore("sc6", period); - SlowChore sc7 = new SlowChore("sc7", period); - SlowChore sc8 = new SlowChore("sc8", period); - SlowChore sc9 = new SlowChore("sc9", period); - SlowChore sc10 = new SlowChore("sc10", period); + service.scheduleChore(sc6); + service.scheduleChore(sc7); + service.scheduleChore(sc8); + service.scheduleChore(sc9); + service.scheduleChore(sc10); - service.scheduleChore(sc6); - service.scheduleChore(sc7); - service.scheduleChore(sc8); - service.scheduleChore(sc9); - service.scheduleChore(sc10); - - Thread.sleep(sleepTime); - assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); - } finally { - shutdownService(service); - } + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); } @Test public void testChangingChoreServices() throws InterruptedException { final int period = 100; final int sleepTime = 10; - ChoreService service1 = new ChoreService("testChangingChoreServices_1"); - ChoreService service2 = new ChoreService("testChangingChoreServices_2"); + ChoreService anotherService = new ChoreService(name.getMethodName() + "_2"); ScheduledChore chore = new DoNothingChore("sample", period); try { assertFalse(chore.isScheduled()); - assertFalse(service1.isChoreScheduled(chore)); - assertFalse(service2.isChoreScheduled(chore)); - assertTrue(chore.getChoreServicer() == null); + assertFalse(service.isChoreScheduled(chore)); + assertFalse(anotherService.isChoreScheduled(chore)); + assertTrue(chore.getChoreService() == null); - service1.scheduleChore(chore); + service.scheduleChore(chore); Thread.sleep(sleepTime); assertTrue(chore.isScheduled()); - assertTrue(service1.isChoreScheduled(chore)); - assertFalse(service2.isChoreScheduled(chore)); - assertFalse(chore.getChoreServicer() == null); + assertTrue(service.isChoreScheduled(chore)); + assertFalse(anotherService.isChoreScheduled(chore)); + assertFalse(chore.getChoreService() == null); - service2.scheduleChore(chore); + anotherService.scheduleChore(chore); Thread.sleep(sleepTime); assertTrue(chore.isScheduled()); - assertFalse(service1.isChoreScheduled(chore)); - assertTrue(service2.isChoreScheduled(chore)); - assertFalse(chore.getChoreServicer() == null); + assertFalse(service.isChoreScheduled(chore)); + assertTrue(anotherService.isChoreScheduled(chore)); + assertFalse(chore.getChoreService() == null); chore.cancel(); assertFalse(chore.isScheduled()); - assertFalse(service1.isChoreScheduled(chore)); - assertFalse(service2.isChoreScheduled(chore)); - assertTrue(chore.getChoreServicer() == null); + assertFalse(service.isChoreScheduled(chore)); + assertFalse(anotherService.isChoreScheduled(chore)); + assertTrue(chore.getChoreService() == null); } finally { - shutdownService(service1); - shutdownService(service2); + shutdownService(anotherService); } } @Test public void testStopperForScheduledChores() throws InterruptedException { - ChoreService service = new ChoreService("testStopperForScheduledChores"); Stoppable stopperForGroup1 = new SampleStopper(); Stoppable stopperForGroup2 = new SampleStopper(); final int period = 100; - final int delta = period/10; + final int delta = period / 10; + ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); + ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); + ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); - try { - ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); - ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); - ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); + ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); + ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); + ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); - ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); - ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); - ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); + service.scheduleChore(chore1_group1); + service.scheduleChore(chore2_group1); + service.scheduleChore(chore3_group1); + service.scheduleChore(chore1_group2); + service.scheduleChore(chore2_group2); + service.scheduleChore(chore3_group2); - service.scheduleChore(chore1_group1); - service.scheduleChore(chore2_group1); - service.scheduleChore(chore3_group1); - service.scheduleChore(chore1_group2); - service.scheduleChore(chore2_group2); - service.scheduleChore(chore3_group2); + Thread.sleep(delta); + Thread.sleep(10 * period); + assertTrue(chore1_group1.isScheduled()); + assertTrue(chore2_group1.isScheduled()); + assertTrue(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); - Thread.sleep(delta); - Thread.sleep(10 * period); - assertTrue(chore1_group1.isScheduled()); - assertTrue(chore2_group1.isScheduled()); - assertTrue(chore3_group1.isScheduled()); - assertTrue(chore1_group2.isScheduled()); - assertTrue(chore2_group2.isScheduled()); - assertTrue(chore3_group2.isScheduled()); + stopperForGroup1.stop("test stopping group 1"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); - stopperForGroup1.stop("test stopping group 1"); - Thread.sleep(period); - assertFalse(chore1_group1.isScheduled()); - assertFalse(chore2_group1.isScheduled()); - assertFalse(chore3_group1.isScheduled()); - assertTrue(chore1_group2.isScheduled()); - assertTrue(chore2_group2.isScheduled()); - assertTrue(chore3_group2.isScheduled()); - - stopperForGroup2.stop("test stopping group 2"); - Thread.sleep(period); - assertFalse(chore1_group1.isScheduled()); - assertFalse(chore2_group1.isScheduled()); - assertFalse(chore3_group1.isScheduled()); - assertFalse(chore1_group2.isScheduled()); - assertFalse(chore2_group2.isScheduled()); - assertFalse(chore3_group2.isScheduled()); - } finally { - shutdownService(service); - } + stopperForGroup2.stop("test stopping group 2"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertFalse(chore1_group2.isScheduled()); + assertFalse(chore2_group2.isScheduled()); + assertFalse(chore3_group2.isScheduled()); } @Test public void testShutdownCancelsScheduledChores() throws InterruptedException { final int period = 100; - ChoreService service = new ChoreService("testShutdownCancelsScheduledChores"); ScheduledChore successChore1 = new DoNothingChore("sc1", period); ScheduledChore successChore2 = new DoNothingChore("sc2", period); ScheduledChore successChore3 = new DoNothingChore("sc3", period); + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); - try { - assertTrue(service.scheduleChore(successChore1)); - assertTrue(successChore1.isScheduled()); - assertTrue(service.scheduleChore(successChore2)); - assertTrue(successChore2.isScheduled()); - assertTrue(service.scheduleChore(successChore3)); - assertTrue(successChore3.isScheduled()); - } finally { - shutdownService(service); - } + shutdownService(service); assertFalse(successChore1.isScheduled()); assertFalse(successChore2.isScheduled()); @@ -788,34 +705,28 @@ public class TestChoreService { public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException { final int period = 100; final int sleep = 5 * period; - ChoreService service = new ChoreService("testShutdownWorksWhileChoresAreExecuting"); ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep); ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep); ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep); - try { - assertTrue(service.scheduleChore(slowChore1)); - assertTrue(service.scheduleChore(slowChore2)); - assertTrue(service.scheduleChore(slowChore3)); + assertTrue(service.scheduleChore(slowChore1)); + assertTrue(service.scheduleChore(slowChore2)); + assertTrue(service.scheduleChore(slowChore3)); - Thread.sleep(sleep / 2); - shutdownService(service); + Thread.sleep(sleep / 2); + shutdownService(service); - assertFalse(slowChore1.isScheduled()); - assertFalse(slowChore2.isScheduled()); - assertFalse(slowChore3.isScheduled()); - assertTrue(service.isShutdown()); + assertFalse(slowChore1.isScheduled()); + assertFalse(slowChore2.isScheduled()); + assertFalse(slowChore3.isScheduled()); + assertTrue(service.isShutdown()); - Thread.sleep(5); - assertTrue(service.isTerminated()); - } finally { - shutdownService(service); - } + Thread.sleep(5); + assertTrue(service.isTerminated()); } @Test public void testShutdownRejectsNewSchedules() throws InterruptedException { final int period = 100; - ChoreService service = new ChoreService("testShutdownRejectsNewSchedules"); ScheduledChore successChore1 = new DoNothingChore("sc1", period); ScheduledChore successChore2 = new DoNothingChore("sc2", period); ScheduledChore successChore3 = new DoNothingChore("sc3", period); @@ -823,16 +734,14 @@ public class TestChoreService { ScheduledChore failChore2 = new DoNothingChore("fc2", period); ScheduledChore failChore3 = new DoNothingChore("fc3", period); - try { - assertTrue(service.scheduleChore(successChore1)); - assertTrue(successChore1.isScheduled()); - assertTrue(service.scheduleChore(successChore2)); - assertTrue(successChore2.isScheduled()); - assertTrue(service.scheduleChore(successChore3)); - assertTrue(successChore3.isScheduled()); - } finally { - shutdownService(service); - } + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + shutdownService(service); assertFalse(service.scheduleChore(failChore1)); assertFalse(failChore1.isScheduled()); @@ -845,17 +754,38 @@ public class TestChoreService { /** * for HBASE-25014 */ - @Test(timeout = 10000) + @Test public void testInitialDelay() { - ChoreService service = new ChoreService(name.getMethodName()); SampleStopper stopper = new SampleStopper(); service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) { - @Override protected void chore() { + @Override + protected void chore() { stopper.stop("test"); } }); - while (!stopper.isStopped()) { - Threads.sleep(1000); - } + Waiter.waitFor(CONF, 5000, () -> stopper.isStopped()); + } + + @Test + public void testCleanupWithStopper() { + SampleStopper stopper = new SampleStopper(); + DoNothingChore chore = spy(new DoNothingChore("chore", stopper, 10)); + service.scheduleChore(chore); + assertTrue(chore.isScheduled()); + verify(chore, never()).cleanup(); + stopper.stop("test"); + Waiter.waitFor(CONF, 200, () -> !chore.isScheduled()); + verify(chore, atLeastOnce()).cleanup(); + } + + @Test + public void testCleanupWithShutdown() { + DoNothingChore chore = spy(new DoNothingChore("chore", 10)); + service.scheduleChore(chore); + assertTrue(chore.isScheduled()); + verify(chore, never()).cleanup(); + chore.shutdown(true); + Waiter.waitFor(CONF, 200, () -> !chore.isScheduled()); + verify(chore, atLeastOnce()).cleanup(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4d5af03e962..18419abdc29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -58,7 +58,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -1486,11 +1485,9 @@ public class HMaster extends HRegionServer implements MasterServices { try { snapshotCleanupTracker.setSnapshotCleanupEnabled(on); if (on) { - if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) { - getChoreService().scheduleChore(this.snapshotCleanerChore); - } + getChoreService().scheduleChore(this.snapshotCleanerChore); } else { - getChoreService().cancelChore(this.snapshotCleanerChore); + this.snapshotCleanerChore.cancel(); } } catch (KeeperException e) { LOG.error("Error updating snapshot cleanup mode to {}", on, e); @@ -1514,24 +1511,23 @@ public class HMaster extends HRegionServer implements MasterServices { } private void stopChores() { - ChoreService choreService = getChoreService(); - if (choreService != null) { - choreService.cancelChore(this.expiredMobFileCleanerChore); - choreService.cancelChore(this.mobCompactChore); - choreService.cancelChore(this.balancerChore); + if (getChoreService() != null) { + shutdownChore(expiredMobFileCleanerChore); + shutdownChore(expiredMobFileCleanerChore); + shutdownChore(balancerChore); if (regionNormalizerManager != null) { - choreService.cancelChore(regionNormalizerManager.getRegionNormalizerChore()); + shutdownChore(regionNormalizerManager.getRegionNormalizerChore()); } - choreService.cancelChore(this.clusterStatusChore); - choreService.cancelChore(this.catalogJanitorChore); - choreService.cancelChore(this.clusterStatusPublisherChore); - choreService.cancelChore(this.snapshotQuotaChore); - choreService.cancelChore(this.logCleaner); - choreService.cancelChore(this.hfileCleaner); - choreService.cancelChore(this.replicationBarrierCleaner); - choreService.cancelChore(this.snapshotCleanerChore); - choreService.cancelChore(this.hbckChore); - choreService.cancelChore(this.regionsRecoveryChore); + shutdownChore(clusterStatusChore); + shutdownChore(catalogJanitorChore); + shutdownChore(clusterStatusPublisherChore); + shutdownChore(snapshotQuotaChore); + shutdownChore(logCleaner); + shutdownChore(hfileCleaner); + shutdownChore(replicationBarrierCleaner); + shutdownChore(snapshotCleanerChore); + shutdownChore(hbckChore); + shutdownChore(regionsRecoveryChore); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java index 0ee5a1bf71e..184c73c1e64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HConstants; @@ -70,7 +69,6 @@ public class RegionsRecoveryChore extends ScheduledChore { */ RegionsRecoveryChore(final Stoppable stopper, final Configuration configuration, final HMaster hMaster) { - super(REGIONS_RECOVERY_CHORE_NAME, stopper, configuration.getInt( HConstants.REGIONS_RECOVERY_INTERVAL, HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL)); this.hMaster = hMaster; @@ -125,7 +123,6 @@ public class RegionsRecoveryChore extends ScheduledChore { private Map> getTableToRegionsByRefCount( final Map serverMetricsMap) { - final Map> tableToReopenRegionsMap = new HashMap<>(); for (ServerMetrics serverMetrics : serverMetricsMap.values()) { Map regionMetricsMap = serverMetrics.getRegionMetrics(); @@ -146,13 +143,11 @@ public class RegionsRecoveryChore extends ScheduledChore { } } return tableToReopenRegionsMap; - } private void prepareTableToReopenRegionsMap( final Map> tableToReopenRegionsMap, final byte[] regionName, final int regionStoreRefCount) { - final RegionInfo regionInfo = hMaster.getAssignmentManager().getRegionInfo(regionName); final TableName tableName = regionInfo.getTable(); if (TableName.isMetaTableName(tableName)) { @@ -166,21 +161,4 @@ public class RegionsRecoveryChore extends ScheduledChore { tableToReopenRegionsMap.get(tableName).add(regionName); } - - // hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore - // is scheduled at a time - RegionsRecoveryConfigManager - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - return o != null && getClass() == o.getClass(); - } - - @Override - public int hashCode() { - return 31; - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java index b1bfdc0ecb0..78777a18cfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master; +import com.google.errorprone.annotations.RestrictedApi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HConstants; @@ -27,8 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Config manager for RegionsRecovery Chore - Dynamically reload config and update chore - * accordingly + * Config manager for RegionsRecovery Chore - Dynamically reload config and update chore accordingly */ @InterfaceAudience.Private public class RegionsRecoveryConfigManager implements ConfigurationObserver { @@ -36,6 +36,7 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(RegionsRecoveryConfigManager.class); private final HMaster hMaster; + private RegionsRecoveryChore chore; private int prevMaxStoreFileRefCount; private int prevRegionsRecoveryInterval; @@ -51,34 +52,35 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver { final int newMaxStoreFileRefCount = getMaxStoreFileRefCount(conf); final int newRegionsRecoveryInterval = getRegionsRecoveryChoreInterval(conf); - if (prevMaxStoreFileRefCount == newMaxStoreFileRefCount - && prevRegionsRecoveryInterval == newRegionsRecoveryInterval) { + if (prevMaxStoreFileRefCount == newMaxStoreFileRefCount && + prevRegionsRecoveryInterval == newRegionsRecoveryInterval) { // no need to re-schedule the chore with updated config // as there is no change in desired configs return; } - LOG.info("Config Reload for RegionsRecovery Chore. prevMaxStoreFileRefCount: {}," + + LOG.info( + "Config Reload for RegionsRecovery Chore. prevMaxStoreFileRefCount: {}," + " newMaxStoreFileRefCount: {}, prevRegionsRecoveryInterval: {}, " + - "newRegionsRecoveryInterval: {}", prevMaxStoreFileRefCount, newMaxStoreFileRefCount, - prevRegionsRecoveryInterval, newRegionsRecoveryInterval); + "newRegionsRecoveryInterval: {}", + prevMaxStoreFileRefCount, newMaxStoreFileRefCount, prevRegionsRecoveryInterval, + newRegionsRecoveryInterval); - RegionsRecoveryChore regionsRecoveryChore = new RegionsRecoveryChore(this.hMaster, - conf, this.hMaster); + RegionsRecoveryChore regionsRecoveryChore = + new RegionsRecoveryChore(this.hMaster, conf, this.hMaster); ChoreService choreService = this.hMaster.getChoreService(); // Regions Reopen based on very high storeFileRefCount is considered enabled // only if hbase.regions.recovery.store.file.ref.count has value > 0 - synchronized (this) { + if (chore != null) { + chore.shutdown(); + chore = null; + } if (newMaxStoreFileRefCount > 0) { - // reschedule the chore - // provide mayInterruptIfRunning - false to take care of completion - // of in progress task if any - choreService.cancelChore(regionsRecoveryChore, false); + // schedule the new chore choreService.scheduleChore(regionsRecoveryChore); - } else { - choreService.cancelChore(regionsRecoveryChore, false); + chore = regionsRecoveryChore; } this.prevMaxStoreFileRefCount = newMaxStoreFileRefCount; this.prevRegionsRecoveryInterval = newRegionsRecoveryInterval; @@ -86,15 +88,18 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver { } private int getMaxStoreFileRefCount(Configuration configuration) { - return configuration.getInt( - HConstants.STORE_FILE_REF_COUNT_THRESHOLD, + return configuration.getInt(HConstants.STORE_FILE_REF_COUNT_THRESHOLD, HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD); } private int getRegionsRecoveryChoreInterval(Configuration configuration) { - return configuration.getInt( - HConstants.REGIONS_RECOVERY_INTERVAL, + return configuration.getInt(HConstants.REGIONS_RECOVERY_INTERVAL, HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL); } + @RestrictedApi(explanation = "Only visible for testing", link = "", + allowedOnPath = ".*/src/test/.*") + RegionsRecoveryChore getChore() { + return chore; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index f628841cb4f..186a8ff11bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -456,7 +456,7 @@ public class SplitLogManager { choreService.shutdown(); } if (timeoutMonitor != null) { - timeoutMonitor.cancel(true); + timeoutMonitor.shutdown(true); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 92267aeacb3..72e1f680092 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -100,7 +100,7 @@ public class QuotaCache implements Stoppable { public void stop(final String why) { if (refreshChore != null) { LOG.debug("Stopping QuotaRefresherChore chore."); - refreshChore.cancel(true); + refreshChore.shutdown(true); } stopped = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java index 07c6a197566..db55cac2fd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java @@ -98,11 +98,11 @@ public class RegionServerSpaceQuotaManager { public synchronized void stop() { if (spaceQuotaRefresher != null) { - spaceQuotaRefresher.cancel(); + spaceQuotaRefresher.shutdown(); spaceQuotaRefresher = null; } if (regionSizeReporter != null) { - regionSizeReporter.cancel(); + regionSizeReporter.shutdown(); regionSizeReporter = null; } started = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a443e7329e3..b55f1224ae0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPL import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; + import java.io.IOException; import java.lang.management.MemoryType; import java.lang.management.MemoryUsage; @@ -2625,6 +2626,11 @@ public class HRegionServer extends Thread implements } } + protected final void shutdownChore(ScheduledChore chore) { + if (chore != null) { + chore.shutdown(); + } + } /** * Wait on all threads to finish. Presumption is that all closes and stops * have already been called. @@ -2632,14 +2638,15 @@ public class HRegionServer extends Thread implements protected void stopServiceThreads() { // clean up the scheduled chores if (this.choreService != null) { - choreService.cancelChore(nonceManagerChore); - choreService.cancelChore(compactionChecker); - choreService.cancelChore(periodicFlusher); - choreService.cancelChore(healthCheckChore); - choreService.cancelChore(storefileRefresher); - choreService.cancelChore(fsUtilizationChore); - choreService.cancelChore(slowLogTableOpsChore); - // clean up the remaining scheduled chores (in case we missed out any) + shutdownChore(nonceManagerChore); + shutdownChore(compactionChecker); + shutdownChore(periodicFlusher); + shutdownChore(healthCheckChore); + shutdownChore(storefileRefresher); + shutdownChore(fsUtilizationChore); + shutdownChore(slowLogTableOpsChore); + // cancel the remaining scheduled chores (in case we missed out any) + // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any choreService.shutdown(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 1f831eefee6..342ec18e1ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -216,7 +216,7 @@ public class HeapMemoryManager { public void stop() { // The thread is Daemon. Just interrupting the ongoing process. LOG.info("Stopping"); - this.heapMemTunerChore.cancel(true); + this.heapMemTunerChore.shutdown(true); } public void registerTuneObserver(HeapMemoryTuneObserver observer) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java index d29e061d07f..6819e5d2b11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java @@ -18,18 +18,18 @@ package org.apache.hadoop.hbase.master; -import java.io.IOException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.StartMiniClusterOption; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -38,7 +38,7 @@ import org.junit.experimental.categories.Category; /** * Test for Regions Recovery Config Manager */ -@Category({MasterTests.class, MediumTests.class}) +@Category({ MasterTests.class, MediumTests.class }) public class TestRegionsRecoveryConfigManager { @ClassRule @@ -51,8 +51,6 @@ public class TestRegionsRecoveryConfigManager { private HMaster hMaster; - private RegionsRecoveryChore regionsRecoveryChore; - private RegionsRecoveryConfigManager regionsRecoveryConfigManager; private Configuration conf; @@ -62,10 +60,8 @@ public class TestRegionsRecoveryConfigManager { conf = HBASE_TESTING_UTILITY.getConfiguration(); conf.unset("hbase.regions.recovery.store.file.ref.count"); conf.unset("hbase.master.regions.recovery.check.interval"); - StartMiniClusterOption option = StartMiniClusterOption.builder() - .masterClass(TestHMaster.class) - .numRegionServers(1) - .numDataNodes(1).build(); + StartMiniClusterOption option = StartMiniClusterOption.builder().masterClass(TestHMaster.class) + .numRegionServers(1).numDataNodes(1).build(); HBASE_TESTING_UTILITY.startMiniCluster(option); cluster = HBASE_TESTING_UTILITY.getMiniHBaseCluster(); } @@ -77,44 +73,44 @@ public class TestRegionsRecoveryConfigManager { @Test public void testChoreSchedule() throws Exception { - this.hMaster = cluster.getMaster(); - Stoppable stoppable = new StoppableImplementation(); - this.regionsRecoveryChore = new RegionsRecoveryChore(stoppable, conf, hMaster); - this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this.hMaster); // not yet scheduled - Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertFalse( + hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); this.regionsRecoveryConfigManager.onConfigurationChange(conf); // not yet scheduled - Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertFalse( + hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); conf.setInt("hbase.master.regions.recovery.check.interval", 10); this.regionsRecoveryConfigManager.onConfigurationChange(conf); // not yet scheduled - missing config: hbase.regions.recovery.store.file.ref.count - Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertFalse( + hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); conf.setInt("hbase.regions.recovery.store.file.ref.count", 10); this.regionsRecoveryConfigManager.onConfigurationChange(conf); // chore scheduled - Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); conf.setInt("hbase.regions.recovery.store.file.ref.count", 20); this.regionsRecoveryConfigManager.onConfigurationChange(conf); // chore re-scheduled - Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); conf.setInt("hbase.regions.recovery.store.file.ref.count", 20); this.regionsRecoveryConfigManager.onConfigurationChange(conf); // chore scheduling untouched - Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); conf.unset("hbase.regions.recovery.store.file.ref.count"); this.regionsRecoveryConfigManager.onConfigurationChange(conf); // chore un-scheduled - Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore)); + assertFalse( + hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore())); } // Make it public so that JVMClusterUtil can access it. @@ -123,24 +119,4 @@ public class TestRegionsRecoveryConfigManager { super(conf); } } - - /** - * Simple helper class that just keeps track of whether or not its stopped. - */ - private static class StoppableImplementation implements Stoppable { - - private boolean stop = false; - - @Override - public void stop(String why) { - this.stop = true; - } - - @Override - public boolean isStopped() { - return this.stop; - } - - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java index fed7f9326ef..8a99e65a29e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java @@ -110,7 +110,7 @@ public class TestCatalogJanitor { @After public void teardown() { - this.janitor.cancel(true); + this.janitor.shutdown(true); this.masterServices.stop("DONE"); }