HBASE-25509 ChoreService.cancelChore will not call ScheduledChore.cle… (#2890)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
871eb09b3d
commit
a37e727990
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
@ -26,8 +27,6 @@ import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
|
* Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
public class ChoreService implements ChoreServicer {
|
public class ChoreService {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -141,30 +140,41 @@ public class ChoreService implements ChoreServicer {
|
||||||
* @return true when the chore was successfully scheduled. false when the scheduling failed
|
* @return true when the chore was successfully scheduled. false when the scheduling failed
|
||||||
* (typically occurs when a chore is scheduled during shutdown of service)
|
* (typically occurs when a chore is scheduled during shutdown of service)
|
||||||
*/
|
*/
|
||||||
public synchronized boolean scheduleChore(ScheduledChore chore) {
|
public boolean scheduleChore(ScheduledChore chore) {
|
||||||
if (chore == null) {
|
if (chore == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// always lock chore first to prevent dead lock
|
||||||
|
synchronized (chore) {
|
||||||
|
synchronized (this) {
|
||||||
try {
|
try {
|
||||||
|
// Chores should only ever be scheduled with a single ChoreService. If the choreService
|
||||||
|
// is changing, cancel any existing schedules of this chore.
|
||||||
|
if (chore.getChoreService() == this) {
|
||||||
|
LOG.warn("Chore {} has already been scheduled with us", chore);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (chore.getPeriod() <= 0) {
|
if (chore.getPeriod() <= 0) {
|
||||||
LOG.info("Chore {} is disabled because its period is not positive.", chore);
|
LOG.info("Chore {} is disabled because its period is not positive.", chore);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
LOG.info("Chore {} is enabled.", chore);
|
LOG.info("Chore {} is enabled.", chore);
|
||||||
chore.setChoreServicer(this);
|
if (chore.getChoreService() != null) {
|
||||||
ScheduledFuture<?> future =
|
LOG.info("Cancel chore {} from its previous service", chore);
|
||||||
scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
|
chore.getChoreService().cancelChore(chore);
|
||||||
chore.getTimeUnit());
|
}
|
||||||
|
chore.setChoreService(this);
|
||||||
|
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
|
||||||
|
chore.getPeriod(), chore.getTimeUnit());
|
||||||
scheduledChores.put(chore, future);
|
scheduledChores.put(chore, future);
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception exception) {
|
} catch (Exception e) {
|
||||||
if (LOG.isInfoEnabled()) {
|
LOG.error("Could not successfully schedule chore: {}", chore.getName(), e);
|
||||||
LOG.info("Could not successfully schedule chore: " + chore.getName());
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
|
* @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
|
||||||
|
@ -175,19 +185,35 @@ public class ChoreService implements ChoreServicer {
|
||||||
ScheduledFuture<?> future = scheduledChores.get(chore);
|
ScheduledFuture<?> future = scheduledChores.get(chore);
|
||||||
future.cancel(false);
|
future.cancel(false);
|
||||||
}
|
}
|
||||||
scheduleChore(chore);
|
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
|
||||||
|
chore.getPeriod(), chore.getTimeUnit());
|
||||||
|
scheduledChores.put(chore, future);
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
/**
|
||||||
@Override
|
* Cancel any ongoing schedules that this chore has with the implementer of this interface.
|
||||||
public synchronized void cancelChore(ScheduledChore chore) {
|
* <p/>
|
||||||
|
* Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in
|
||||||
|
* {@link ScheduledChore#cancel()} method we will call this method to remove the
|
||||||
|
* {@link ScheduledChore} from this {@link ChoreService}.
|
||||||
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
|
||||||
|
allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
|
||||||
|
synchronized void cancelChore(ScheduledChore chore) {
|
||||||
cancelChore(chore, true);
|
cancelChore(chore, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
/**
|
||||||
@Override
|
* Cancel any ongoing schedules that this chore has with the implementer of this interface.
|
||||||
public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
|
* <p/>
|
||||||
if (chore != null && scheduledChores.containsKey(chore)) {
|
* Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in
|
||||||
|
* {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the
|
||||||
|
* {@link ScheduledChore} from this {@link ChoreService}.
|
||||||
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
|
||||||
|
allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
|
||||||
|
synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
|
||||||
|
if (scheduledChores.containsKey(chore)) {
|
||||||
ScheduledFuture<?> future = scheduledChores.get(chore);
|
ScheduledFuture<?> future = scheduledChores.get(chore);
|
||||||
future.cancel(mayInterruptIfRunning);
|
future.cancel(mayInterruptIfRunning);
|
||||||
scheduledChores.remove(chore);
|
scheduledChores.remove(chore);
|
||||||
|
@ -201,21 +227,24 @@ public class ChoreService implements ChoreServicer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true when the chore is scheduled with the implementer of this interface
|
||||||
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Override
|
|
||||||
public synchronized boolean isChoreScheduled(ScheduledChore chore) {
|
public synchronized boolean isChoreScheduled(ScheduledChore chore) {
|
||||||
return chore != null && scheduledChores.containsKey(chore)
|
return chore != null && scheduledChores.containsKey(chore)
|
||||||
&& !scheduledChores.get(chore).isDone();
|
&& !scheduledChores.get(chore).isDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
/**
|
||||||
@Override
|
* This method tries to execute the chore immediately. If the chore is executing at the time of
|
||||||
public synchronized boolean triggerNow(ScheduledChore chore) {
|
* this call, the chore will begin another execution as soon as the current execution finishes
|
||||||
if (chore != null) {
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
|
||||||
|
allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
|
||||||
|
synchronized void triggerNow(ScheduledChore chore) {
|
||||||
|
assert chore.getChoreService() == this;
|
||||||
rescheduleChore(chore);
|
rescheduleChore(chore);
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -295,10 +324,20 @@ public class ChoreService implements ChoreServicer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
/**
|
||||||
@Override
|
* A callback that tells the implementer of this interface that one of the scheduled chores is
|
||||||
public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
|
* missing its start time. The implication of a chore missing its start time is that the service's
|
||||||
if (chore == null || !scheduledChores.containsKey(chore)) return;
|
* current means of scheduling may not be sufficient to handle the number of ongoing chores (the
|
||||||
|
* other explanation is that the chore's execution time is greater than its scheduled period). The
|
||||||
|
* service should try to increase its concurrency when this callback is received.
|
||||||
|
* @param chore The chore that missed its start time
|
||||||
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
|
||||||
|
allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
|
||||||
|
synchronized void onChoreMissedStartTime(ScheduledChore chore) {
|
||||||
|
if (!scheduledChores.containsKey(chore)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// If the chore has not caused an increase in the size of the core thread pool then request an
|
// If the chore has not caused an increase in the size of the core thread pool then request an
|
||||||
// increase. This allows each chore missing its start time to increase the core pool size by
|
// increase. This allows each chore missing its start time to increase the core pool size by
|
||||||
|
@ -319,13 +358,17 @@ public class ChoreService implements ChoreServicer {
|
||||||
* shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
|
* shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
|
||||||
* in the middle of execution will be interrupted and shutdown. This service will be unusable
|
* in the middle of execution will be interrupted and shutdown. This service will be unusable
|
||||||
* after this method has been called (i.e. future scheduling attempts will fail).
|
* after this method has been called (i.e. future scheduling attempts will fail).
|
||||||
|
* <p/>
|
||||||
|
* Notice that, this will only clean the chore from this ChoreService but you could still schedule
|
||||||
|
* the chore with other ChoreService.
|
||||||
*/
|
*/
|
||||||
public synchronized void shutdown() {
|
public synchronized void shutdown() {
|
||||||
scheduler.shutdownNow();
|
if (isShutdown()) {
|
||||||
if (LOG.isInfoEnabled()) {
|
return;
|
||||||
LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
|
|
||||||
+ " on shutdown");
|
|
||||||
}
|
}
|
||||||
|
scheduler.shutdownNow();
|
||||||
|
LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix,
|
||||||
|
scheduledChores.keySet());
|
||||||
cancelAllChores(true);
|
cancelAllChores(true);
|
||||||
scheduledChores.clear();
|
scheduledChores.clear();
|
||||||
choresMissingStartTime.clear();
|
choresMissingStartTime.clear();
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* execute within the defined period. It is bad practice to define a ScheduledChore whose execution
|
* execute within the defined period. It is bad practice to define a ScheduledChore whose execution
|
||||||
* time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
|
* time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
|
||||||
* thread pool.
|
* thread pool.
|
||||||
* <p>
|
* <p/>
|
||||||
* Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
|
* Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
|
||||||
* an entry being added to a queue, etc.
|
* an entry being added to a queue, etc.
|
||||||
*/
|
*/
|
||||||
|
@ -60,7 +61,7 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
* Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
|
* Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
|
||||||
* not scheduled.
|
* not scheduled.
|
||||||
*/
|
*/
|
||||||
private ChoreServicer choreServicer;
|
private ChoreService choreService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Variables that encapsulate the meaningful state information
|
* Variables that encapsulate the meaningful state information
|
||||||
|
@ -77,39 +78,6 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
*/
|
*/
|
||||||
private final Stoppable stopper;
|
private final Stoppable stopper;
|
||||||
|
|
||||||
interface ChoreServicer {
|
|
||||||
/**
|
|
||||||
* Cancel any ongoing schedules that this chore has with the implementer of this interface.
|
|
||||||
*/
|
|
||||||
public void cancelChore(ScheduledChore chore);
|
|
||||||
public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true when the chore is scheduled with the implementer of this interface
|
|
||||||
*/
|
|
||||||
public boolean isChoreScheduled(ScheduledChore chore);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method tries to execute the chore immediately. If the chore is executing at the time of
|
|
||||||
* this call, the chore will begin another execution as soon as the current execution finishes
|
|
||||||
* <p>
|
|
||||||
* If the chore is not scheduled with a ChoreService, this call will fail.
|
|
||||||
* @return false when the chore could not be triggered immediately
|
|
||||||
*/
|
|
||||||
public boolean triggerNow(ScheduledChore chore);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A callback that tells the implementer of this interface that one of the scheduled chores is
|
|
||||||
* missing its start time. The implication of a chore missing its start time is that the
|
|
||||||
* service's current means of scheduling may not be sufficient to handle the number of ongoing
|
|
||||||
* chores (the other explanation is that the chore's execution time is greater than its
|
|
||||||
* scheduled period). The service should try to increase its concurrency when this callback is
|
|
||||||
* received.
|
|
||||||
* @param chore The chore that missed its start time
|
|
||||||
*/
|
|
||||||
public void onChoreMissedStartTime(ScheduledChore chore);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This constructor is for test only. It allows us to create an object and to call chore() on it.
|
* This constructor is for test only. It allows us to create an object and to call chore() on it.
|
||||||
*/
|
*/
|
||||||
|
@ -168,8 +136,8 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
onChoreMissedStartTime();
|
onChoreMissedStartTime();
|
||||||
LOG.info("Chore: {} missed its start time", getName());
|
LOG.info("Chore: {} missed its start time", getName());
|
||||||
} else if (stopper.isStopped() || !isScheduled()) {
|
} else if (stopper.isStopped() || !isScheduled()) {
|
||||||
cancel(false);
|
// call shutdown here to cleanup the ScheduledChore.
|
||||||
cleanup();
|
shutdown(false);
|
||||||
LOG.info("Chore: {} was stopped", getName());
|
LOG.info("Chore: {} was stopped", getName());
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
|
@ -193,7 +161,6 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
LOG.error("Caught error", t);
|
LOG.error("Caught error", t);
|
||||||
if (this.stopper.isStopped()) {
|
if (this.stopper.isStopped()) {
|
||||||
cancel(false);
|
cancel(false);
|
||||||
cleanup();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,7 +181,9 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
* pool threads
|
* pool threads
|
||||||
*/
|
*/
|
||||||
private synchronized void onChoreMissedStartTime() {
|
private synchronized void onChoreMissedStartTime() {
|
||||||
if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
|
if (choreService != null) {
|
||||||
|
choreService.onChoreMissedStartTime(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -253,20 +222,17 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
* @return false when the Chore is not currently scheduled with a ChoreService
|
* @return false when the Chore is not currently scheduled with a ChoreService
|
||||||
*/
|
*/
|
||||||
public synchronized boolean triggerNow() {
|
public synchronized boolean triggerNow() {
|
||||||
if (choreServicer != null) {
|
if (choreService == null) {
|
||||||
return choreServicer.triggerNow(this);
|
|
||||||
} else {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
choreService.triggerNow(this);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void setChoreServicer(ChoreServicer service) {
|
@RestrictedApi(explanation = "Should only be called in ChoreService", link = "",
|
||||||
// Chores should only ever be scheduled with a single ChoreService. If the choreServicer
|
allowedOnPath = ".*/org/apache/hadoop/hbase/ChoreService.java")
|
||||||
// is changing, cancel any existing schedules of this chore.
|
synchronized void setChoreService(ChoreService service) {
|
||||||
if (choreServicer != null && choreServicer != service) {
|
choreService = service;
|
||||||
choreServicer.cancelChore(this, false);
|
|
||||||
}
|
|
||||||
choreServicer = service;
|
|
||||||
timeOfThisRun = -1;
|
timeOfThisRun = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,9 +241,10 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void cancel(boolean mayInterruptIfRunning) {
|
public synchronized void cancel(boolean mayInterruptIfRunning) {
|
||||||
if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
|
if (isScheduled()) {
|
||||||
|
choreService.cancelChore(this, mayInterruptIfRunning);
|
||||||
choreServicer = null;
|
}
|
||||||
|
choreService = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
|
@ -310,17 +277,14 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
return initialChoreComplete;
|
return initialChoreComplete;
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
synchronized ChoreService getChoreService() {
|
||||||
synchronized ChoreServicer getChoreServicer() {
|
return choreService;
|
||||||
return choreServicer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
synchronized long getTimeOfLastRun() {
|
synchronized long getTimeOfLastRun() {
|
||||||
return timeOfLastRun;
|
return timeOfLastRun;
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
synchronized long getTimeOfThisRun() {
|
synchronized long getTimeOfThisRun() {
|
||||||
return timeOfThisRun;
|
return timeOfThisRun;
|
||||||
}
|
}
|
||||||
|
@ -329,10 +293,12 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
* @return true when this Chore is scheduled with a ChoreService
|
* @return true when this Chore is scheduled with a ChoreService
|
||||||
*/
|
*/
|
||||||
public synchronized boolean isScheduled() {
|
public synchronized boolean isScheduled() {
|
||||||
return choreServicer != null && choreServicer.isChoreScheduled(this);
|
return choreService != null && choreService.isChoreScheduled(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
|
allowedOnPath = ".*/src/test/.*")
|
||||||
public synchronized void choreForTesting() {
|
public synchronized void choreForTesting() {
|
||||||
chore();
|
chore();
|
||||||
}
|
}
|
||||||
|
@ -354,7 +320,26 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Override to run cleanup tasks when the Chore encounters an error and must stop running
|
* Override to run cleanup tasks when the Chore encounters an error and must stop running
|
||||||
*/
|
*/
|
||||||
protected synchronized void cleanup() {
|
protected void cleanup() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call {@link #shutdown(boolean)} with {@code true}.
|
||||||
|
* @see ScheduledChore#shutdown(boolean)
|
||||||
|
*/
|
||||||
|
public synchronized void shutdown() {
|
||||||
|
shutdown(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Completely shutdown the ScheduleChore, which means we will call cleanup and you should not
|
||||||
|
* schedule it again.
|
||||||
|
* <p/>
|
||||||
|
* This is another path to cleanup the chore, comparing to stop the stopper instance passed in.
|
||||||
|
*/
|
||||||
|
public synchronized void shutdown(boolean mayInterruptIfRunning) {
|
||||||
|
cancel(mayInterruptIfRunning);
|
||||||
|
cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,16 +20,18 @@ package org.apache.hadoop.hbase;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore;
|
|
||||||
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore;
|
|
||||||
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper;
|
|
||||||
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore;
|
|
||||||
import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -38,26 +40,38 @@ import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category({ MiscTests.class, MediumTests.class })
|
||||||
public class TestChoreService {
|
public class TestChoreService {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestChoreService.class);
|
HBaseClassTestRule.forClass(TestChoreService.class);
|
||||||
|
|
||||||
public static final Logger log = LoggerFactory.getLogger(TestChoreService.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestChoreService.class);
|
||||||
|
|
||||||
|
private static final Configuration CONF = HBaseConfiguration.create();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
/**
|
private int initialCorePoolSize = 3;
|
||||||
* A few ScheduledChore samples that are useful for testing with ChoreService
|
|
||||||
*/
|
private ChoreService service;
|
||||||
public static class ScheduledChoreSamples {
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
service = new ChoreService(name.getMethodName(), initialCorePoolSize, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
shutdownService(service);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Straight forward stopper implementation that is used by default when one is not provided
|
* Straight forward stopper implementation that is used by default when one is not provided
|
||||||
*/
|
*/
|
||||||
public static class SampleStopper implements Stoppable {
|
private static class SampleStopper implements Stoppable {
|
||||||
private boolean stopped = false;
|
private boolean stopped = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -75,7 +89,7 @@ public class TestChoreService {
|
||||||
* Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
|
* Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
|
||||||
* executions
|
* executions
|
||||||
*/
|
*/
|
||||||
public static class SlowChore extends ScheduledChore {
|
private static class SlowChore extends ScheduledChore {
|
||||||
public SlowChore(String name, int period) {
|
public SlowChore(String name, int period) {
|
||||||
this(name, new SampleStopper(), period);
|
this(name, new SampleStopper(), period);
|
||||||
}
|
}
|
||||||
|
@ -86,28 +100,21 @@ public class TestChoreService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean initialChore() {
|
protected boolean initialChore() {
|
||||||
try {
|
Threads.sleep(getPeriod() * 2);
|
||||||
Thread.sleep(getPeriod() * 2);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.warn("", e);
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
try {
|
Threads.sleep(getPeriod() * 2);
|
||||||
Thread.sleep(getPeriod() * 2);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.warn("", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
|
* Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
|
||||||
*/
|
*/
|
||||||
public static class DoNothingChore extends ScheduledChore {
|
private static class DoNothingChore extends ScheduledChore {
|
||||||
|
|
||||||
public DoNothingChore(String name, int period) {
|
public DoNothingChore(String name, int period) {
|
||||||
super(name, new SampleStopper(), period);
|
super(name, new SampleStopper(), period);
|
||||||
}
|
}
|
||||||
|
@ -120,10 +127,9 @@ public class TestChoreService {
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
// DO NOTHING
|
// DO NOTHING
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SleepingChore extends ScheduledChore {
|
private static class SleepingChore extends ScheduledChore {
|
||||||
private int sleepTime;
|
private int sleepTime;
|
||||||
|
|
||||||
public SleepingChore(String name, int chorePeriod, int sleepTime) {
|
public SleepingChore(String name, int chorePeriod, int sleepTime) {
|
||||||
|
@ -137,25 +143,17 @@ public class TestChoreService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean initialChore() {
|
protected boolean initialChore() {
|
||||||
try {
|
Threads.sleep(sleepTime);
|
||||||
Thread.sleep(sleepTime);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.warn("", e);
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
try {
|
Threads.sleep(sleepTime);
|
||||||
Thread.sleep(sleepTime);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class CountingChore extends ScheduledChore {
|
private static class CountingChore extends ScheduledChore {
|
||||||
private int countOfChoreCalls;
|
private int countOfChoreCalls;
|
||||||
private boolean outputOnTicks = false;
|
private boolean outputOnTicks = false;
|
||||||
|
|
||||||
|
@ -167,8 +165,7 @@ public class TestChoreService {
|
||||||
this(name, stopper, period, false);
|
this(name, stopper, period, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CountingChore(String name, Stoppable stopper, int period,
|
public CountingChore(String name, Stoppable stopper, int period, final boolean outputOnTicks) {
|
||||||
final boolean outputOnTicks) {
|
|
||||||
super(name, stopper, period);
|
super(name, stopper, period);
|
||||||
this.countOfChoreCalls = 0;
|
this.countOfChoreCalls = 0;
|
||||||
this.outputOnTicks = outputOnTicks;
|
this.outputOnTicks = outputOnTicks;
|
||||||
|
@ -192,20 +189,12 @@ public class TestChoreService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void outputTickCount() {
|
private void outputTickCount() {
|
||||||
log.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
|
LOG.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCountOfChoreCalls() {
|
public int getCountOfChoreCalls() {
|
||||||
return countOfChoreCalls;
|
return countOfChoreCalls;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isOutputtingOnTicks() {
|
|
||||||
return outputOnTicks;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOutputOnTicks(boolean o) {
|
|
||||||
outputOnTicks = o;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -245,18 +234,12 @@ public class TestChoreService {
|
||||||
assertTrue(numberOfFailures == failureThreshold);
|
assertTrue(numberOfFailures == failureThreshold);
|
||||||
cancel(false);
|
cancel(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInitialChorePrecedence() throws InterruptedException {
|
public void testInitialChorePrecedence() throws InterruptedException {
|
||||||
ChoreService service = new ChoreService("testInitialChorePrecedence");
|
|
||||||
|
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int failureThreshold = 5;
|
final int failureThreshold = 5;
|
||||||
|
|
||||||
try {
|
|
||||||
ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
|
ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
|
||||||
service.scheduleChore(chore);
|
service.scheduleChore(chore);
|
||||||
|
|
||||||
|
@ -273,26 +256,18 @@ public class TestChoreService {
|
||||||
}
|
}
|
||||||
|
|
||||||
assertFalse(brokeOutOfLoop);
|
assertFalse(brokeOutOfLoop);
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancelChore() throws InterruptedException {
|
public void testCancelChore() throws InterruptedException {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
ScheduledChore chore1 = new DoNothingChore("chore1", period);
|
ScheduledChore chore = new DoNothingChore("chore", period);
|
||||||
ChoreService service = new ChoreService("testCancelChore");
|
service.scheduleChore(chore);
|
||||||
try {
|
assertTrue(chore.isScheduled());
|
||||||
service.scheduleChore(chore1);
|
|
||||||
assertTrue(chore1.isScheduled());
|
|
||||||
|
|
||||||
chore1.cancel(true);
|
chore.cancel(true);
|
||||||
assertFalse(chore1.isScheduled());
|
assertFalse(chore.isScheduled());
|
||||||
assertTrue(service.getNumberOfScheduledChores() == 0);
|
assertTrue(service.getNumberOfScheduledChores() == 0);
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -361,9 +336,7 @@ public class TestChoreService {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
// Small delta that acts as time buffer (allowing chores to complete if running slowly)
|
// Small delta that acts as time buffer (allowing chores to complete if running slowly)
|
||||||
final int delta = period / 5;
|
final int delta = period / 5;
|
||||||
ChoreService service = new ChoreService("testFrequencyOfChores");
|
|
||||||
CountingChore chore = new CountingChore("countingChore", period);
|
CountingChore chore = new CountingChore("countingChore", period);
|
||||||
try {
|
|
||||||
service.scheduleChore(chore);
|
service.scheduleChore(chore);
|
||||||
|
|
||||||
Thread.sleep(10 * period + delta);
|
Thread.sleep(10 * period + delta);
|
||||||
|
@ -371,25 +344,18 @@ public class TestChoreService {
|
||||||
|
|
||||||
Thread.sleep(10 * period + delta);
|
Thread.sleep(10 * period + delta);
|
||||||
assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls());
|
assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdownService(ChoreService service) throws InterruptedException {
|
public void shutdownService(ChoreService service) {
|
||||||
service.shutdown();
|
service.shutdown();
|
||||||
while (!service.isTerminated()) {
|
Waiter.waitFor(CONF, 1000, () -> service.isTerminated());
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testForceTrigger() throws InterruptedException {
|
public void testForceTrigger() throws InterruptedException {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int delta = period / 10;
|
final int delta = period / 10;
|
||||||
ChoreService service = new ChoreService("testForceTrigger");
|
|
||||||
final CountingChore chore = new CountingChore("countingChore", period);
|
final CountingChore chore = new CountingChore("countingChore", period);
|
||||||
try {
|
|
||||||
service.scheduleChore(chore);
|
service.scheduleChore(chore);
|
||||||
Thread.sleep(10 * period + delta);
|
Thread.sleep(10 * period + delta);
|
||||||
|
|
||||||
|
@ -408,25 +374,17 @@ public class TestChoreService {
|
||||||
chore.triggerNow();
|
chore.triggerNow();
|
||||||
Thread.sleep(delta);
|
Thread.sleep(delta);
|
||||||
|
|
||||||
assertEquals("Trigger was called 5 times after 10 periods.", 16,
|
assertEquals("Trigger was called 5 times after 10 periods.", 16, chore.getCountOfChoreCalls());
|
||||||
chore.getCountOfChoreCalls());
|
|
||||||
|
|
||||||
Thread.sleep(10 * period + delta);
|
Thread.sleep(10 * period + delta);
|
||||||
|
|
||||||
// Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing.
|
// Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing.
|
||||||
assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(),
|
assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(),
|
||||||
chore.getCountOfChoreCalls() > 16);
|
chore.getCountOfChoreCalls() > 16);
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCorePoolIncrease() throws InterruptedException {
|
public void testCorePoolIncrease() throws InterruptedException {
|
||||||
final int initialCorePoolSize = 3;
|
|
||||||
ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false);
|
|
||||||
|
|
||||||
try {
|
|
||||||
assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize,
|
assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize,
|
||||||
service.getCorePoolSize());
|
service.getCorePoolSize());
|
||||||
|
|
||||||
|
@ -456,17 +414,11 @@ public class TestChoreService {
|
||||||
Thread.sleep(slowChorePeriod * 10);
|
Thread.sleep(slowChorePeriod * 10);
|
||||||
assertEquals("Chores are missing their start time. Should expand core pool size", 5,
|
assertEquals("Chores are missing their start time. Should expand core pool size", 5,
|
||||||
service.getCorePoolSize());
|
service.getCorePoolSize());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCorePoolDecrease() throws InterruptedException {
|
public void testCorePoolDecrease() throws InterruptedException {
|
||||||
final int initialCorePoolSize = 3;
|
|
||||||
ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false);
|
|
||||||
final int chorePeriod = 100;
|
final int chorePeriod = 100;
|
||||||
try {
|
|
||||||
// Slow chores always miss their start time and thus the core pool size should be at least as
|
// Slow chores always miss their start time and thus the core pool size should be at least as
|
||||||
// large as the number of running slow chores
|
// large as the number of running slow chores
|
||||||
SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
|
SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
|
||||||
|
@ -525,19 +477,12 @@ public class TestChoreService {
|
||||||
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
|
||||||
service.getCorePoolSize());
|
service.getCorePoolSize());
|
||||||
assertEquals(0, service.getNumberOfChoresMissingStartTime());
|
assertEquals(0, service.getNumberOfChoresMissingStartTime());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNumberOfRunningChores() throws InterruptedException {
|
public void testNumberOfRunningChores() throws InterruptedException {
|
||||||
ChoreService service = new ChoreService("testNumberOfRunningChores");
|
|
||||||
|
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int sleepTime = 5;
|
final int sleepTime = 5;
|
||||||
|
|
||||||
try {
|
|
||||||
DoNothingChore dn1 = new DoNothingChore("dn1", period);
|
DoNothingChore dn1 = new DoNothingChore("dn1", period);
|
||||||
DoNothingChore dn2 = new DoNothingChore("dn2", period);
|
DoNothingChore dn2 = new DoNothingChore("dn2", period);
|
||||||
DoNothingChore dn3 = new DoNothingChore("dn3", period);
|
DoNothingChore dn3 = new DoNothingChore("dn3", period);
|
||||||
|
@ -566,19 +511,12 @@ public class TestChoreService {
|
||||||
dn5.cancel();
|
dn5.cancel();
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
|
assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNumberOfChoresMissingStartTime() throws InterruptedException {
|
public void testNumberOfChoresMissingStartTime() throws InterruptedException {
|
||||||
ChoreService service = new ChoreService("testNumberOfChoresMissingStartTime");
|
|
||||||
|
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int sleepTime = 20 * period;
|
final int sleepTime = 20 * period;
|
||||||
|
|
||||||
try {
|
|
||||||
// Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
|
// Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
|
||||||
// ALWAYS miss their start time since their execution takes longer than their period
|
// ALWAYS miss their start time since their execution takes longer than their period
|
||||||
SlowChore sc1 = new SlowChore("sc1", period);
|
SlowChore sc1 = new SlowChore("sc1", period);
|
||||||
|
@ -609,9 +547,6 @@ public class TestChoreService {
|
||||||
sc5.cancel();
|
sc5.cancel();
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
assertEquals(0, service.getNumberOfChoresMissingStartTime());
|
assertEquals(0, service.getNumberOfChoresMissingStartTime());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -621,12 +556,9 @@ public class TestChoreService {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMaximumChoreServiceThreads() throws InterruptedException {
|
public void testMaximumChoreServiceThreads() throws InterruptedException {
|
||||||
ChoreService service = new ChoreService("testMaximumChoreServiceThreads");
|
|
||||||
|
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int sleepTime = 5 * period;
|
final int sleepTime = 5 * period;
|
||||||
|
|
||||||
try {
|
|
||||||
// Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
|
// Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
|
||||||
// ALWAYS miss their start time since their execution takes longer than their period.
|
// ALWAYS miss their start time since their execution takes longer than their period.
|
||||||
// Chores that miss their start time will trigger the onChoreMissedStartTime callback
|
// Chores that miss their start time will trigger the onChoreMissedStartTime callback
|
||||||
|
@ -661,59 +593,51 @@ public class TestChoreService {
|
||||||
|
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
|
assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChangingChoreServices() throws InterruptedException {
|
public void testChangingChoreServices() throws InterruptedException {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int sleepTime = 10;
|
final int sleepTime = 10;
|
||||||
ChoreService service1 = new ChoreService("testChangingChoreServices_1");
|
ChoreService anotherService = new ChoreService(name.getMethodName() + "_2");
|
||||||
ChoreService service2 = new ChoreService("testChangingChoreServices_2");
|
|
||||||
ScheduledChore chore = new DoNothingChore("sample", period);
|
ScheduledChore chore = new DoNothingChore("sample", period);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
assertFalse(chore.isScheduled());
|
assertFalse(chore.isScheduled());
|
||||||
assertFalse(service1.isChoreScheduled(chore));
|
assertFalse(service.isChoreScheduled(chore));
|
||||||
assertFalse(service2.isChoreScheduled(chore));
|
assertFalse(anotherService.isChoreScheduled(chore));
|
||||||
assertTrue(chore.getChoreServicer() == null);
|
assertTrue(chore.getChoreService() == null);
|
||||||
|
|
||||||
service1.scheduleChore(chore);
|
service.scheduleChore(chore);
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
assertTrue(chore.isScheduled());
|
assertTrue(chore.isScheduled());
|
||||||
assertTrue(service1.isChoreScheduled(chore));
|
assertTrue(service.isChoreScheduled(chore));
|
||||||
assertFalse(service2.isChoreScheduled(chore));
|
assertFalse(anotherService.isChoreScheduled(chore));
|
||||||
assertFalse(chore.getChoreServicer() == null);
|
assertFalse(chore.getChoreService() == null);
|
||||||
|
|
||||||
service2.scheduleChore(chore);
|
anotherService.scheduleChore(chore);
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
assertTrue(chore.isScheduled());
|
assertTrue(chore.isScheduled());
|
||||||
assertFalse(service1.isChoreScheduled(chore));
|
assertFalse(service.isChoreScheduled(chore));
|
||||||
assertTrue(service2.isChoreScheduled(chore));
|
assertTrue(anotherService.isChoreScheduled(chore));
|
||||||
assertFalse(chore.getChoreServicer() == null);
|
assertFalse(chore.getChoreService() == null);
|
||||||
|
|
||||||
chore.cancel();
|
chore.cancel();
|
||||||
assertFalse(chore.isScheduled());
|
assertFalse(chore.isScheduled());
|
||||||
assertFalse(service1.isChoreScheduled(chore));
|
assertFalse(service.isChoreScheduled(chore));
|
||||||
assertFalse(service2.isChoreScheduled(chore));
|
assertFalse(anotherService.isChoreScheduled(chore));
|
||||||
assertTrue(chore.getChoreServicer() == null);
|
assertTrue(chore.getChoreService() == null);
|
||||||
} finally {
|
} finally {
|
||||||
shutdownService(service1);
|
shutdownService(anotherService);
|
||||||
shutdownService(service2);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStopperForScheduledChores() throws InterruptedException {
|
public void testStopperForScheduledChores() throws InterruptedException {
|
||||||
ChoreService service = new ChoreService("testStopperForScheduledChores");
|
|
||||||
Stoppable stopperForGroup1 = new SampleStopper();
|
Stoppable stopperForGroup1 = new SampleStopper();
|
||||||
Stoppable stopperForGroup2 = new SampleStopper();
|
Stoppable stopperForGroup2 = new SampleStopper();
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int delta = period / 10;
|
final int delta = period / 10;
|
||||||
|
|
||||||
try {
|
|
||||||
ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
|
ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
|
||||||
ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
|
ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
|
||||||
ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
|
ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
|
||||||
|
@ -755,29 +679,22 @@ public class TestChoreService {
|
||||||
assertFalse(chore1_group2.isScheduled());
|
assertFalse(chore1_group2.isScheduled());
|
||||||
assertFalse(chore2_group2.isScheduled());
|
assertFalse(chore2_group2.isScheduled());
|
||||||
assertFalse(chore3_group2.isScheduled());
|
assertFalse(chore3_group2.isScheduled());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testShutdownCancelsScheduledChores() throws InterruptedException {
|
public void testShutdownCancelsScheduledChores() throws InterruptedException {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
ChoreService service = new ChoreService("testShutdownCancelsScheduledChores");
|
|
||||||
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
|
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
|
||||||
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
|
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
|
||||||
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
|
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
|
||||||
|
|
||||||
try {
|
|
||||||
assertTrue(service.scheduleChore(successChore1));
|
assertTrue(service.scheduleChore(successChore1));
|
||||||
assertTrue(successChore1.isScheduled());
|
assertTrue(successChore1.isScheduled());
|
||||||
assertTrue(service.scheduleChore(successChore2));
|
assertTrue(service.scheduleChore(successChore2));
|
||||||
assertTrue(successChore2.isScheduled());
|
assertTrue(successChore2.isScheduled());
|
||||||
assertTrue(service.scheduleChore(successChore3));
|
assertTrue(service.scheduleChore(successChore3));
|
||||||
assertTrue(successChore3.isScheduled());
|
assertTrue(successChore3.isScheduled());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
shutdownService(service);
|
||||||
}
|
|
||||||
|
|
||||||
assertFalse(successChore1.isScheduled());
|
assertFalse(successChore1.isScheduled());
|
||||||
assertFalse(successChore2.isScheduled());
|
assertFalse(successChore2.isScheduled());
|
||||||
|
@ -788,11 +705,9 @@ public class TestChoreService {
|
||||||
public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException {
|
public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
final int sleep = 5 * period;
|
final int sleep = 5 * period;
|
||||||
ChoreService service = new ChoreService("testShutdownWorksWhileChoresAreExecuting");
|
|
||||||
ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep);
|
ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep);
|
||||||
ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep);
|
ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep);
|
||||||
ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep);
|
ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep);
|
||||||
try {
|
|
||||||
assertTrue(service.scheduleChore(slowChore1));
|
assertTrue(service.scheduleChore(slowChore1));
|
||||||
assertTrue(service.scheduleChore(slowChore2));
|
assertTrue(service.scheduleChore(slowChore2));
|
||||||
assertTrue(service.scheduleChore(slowChore3));
|
assertTrue(service.scheduleChore(slowChore3));
|
||||||
|
@ -807,15 +722,11 @@ public class TestChoreService {
|
||||||
|
|
||||||
Thread.sleep(5);
|
Thread.sleep(5);
|
||||||
assertTrue(service.isTerminated());
|
assertTrue(service.isTerminated());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testShutdownRejectsNewSchedules() throws InterruptedException {
|
public void testShutdownRejectsNewSchedules() throws InterruptedException {
|
||||||
final int period = 100;
|
final int period = 100;
|
||||||
ChoreService service = new ChoreService("testShutdownRejectsNewSchedules");
|
|
||||||
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
|
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
|
||||||
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
|
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
|
||||||
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
|
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
|
||||||
|
@ -823,16 +734,14 @@ public class TestChoreService {
|
||||||
ScheduledChore failChore2 = new DoNothingChore("fc2", period);
|
ScheduledChore failChore2 = new DoNothingChore("fc2", period);
|
||||||
ScheduledChore failChore3 = new DoNothingChore("fc3", period);
|
ScheduledChore failChore3 = new DoNothingChore("fc3", period);
|
||||||
|
|
||||||
try {
|
|
||||||
assertTrue(service.scheduleChore(successChore1));
|
assertTrue(service.scheduleChore(successChore1));
|
||||||
assertTrue(successChore1.isScheduled());
|
assertTrue(successChore1.isScheduled());
|
||||||
assertTrue(service.scheduleChore(successChore2));
|
assertTrue(service.scheduleChore(successChore2));
|
||||||
assertTrue(successChore2.isScheduled());
|
assertTrue(successChore2.isScheduled());
|
||||||
assertTrue(service.scheduleChore(successChore3));
|
assertTrue(service.scheduleChore(successChore3));
|
||||||
assertTrue(successChore3.isScheduled());
|
assertTrue(successChore3.isScheduled());
|
||||||
} finally {
|
|
||||||
shutdownService(service);
|
shutdownService(service);
|
||||||
}
|
|
||||||
|
|
||||||
assertFalse(service.scheduleChore(failChore1));
|
assertFalse(service.scheduleChore(failChore1));
|
||||||
assertFalse(failChore1.isScheduled());
|
assertFalse(failChore1.isScheduled());
|
||||||
|
@ -845,17 +754,38 @@ public class TestChoreService {
|
||||||
/**
|
/**
|
||||||
* for HBASE-25014
|
* for HBASE-25014
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 10000)
|
@Test
|
||||||
public void testInitialDelay() {
|
public void testInitialDelay() {
|
||||||
ChoreService service = new ChoreService(name.getMethodName());
|
|
||||||
SampleStopper stopper = new SampleStopper();
|
SampleStopper stopper = new SampleStopper();
|
||||||
service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) {
|
service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) {
|
||||||
@Override protected void chore() {
|
@Override
|
||||||
|
protected void chore() {
|
||||||
stopper.stop("test");
|
stopper.stop("test");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
while (!stopper.isStopped()) {
|
Waiter.waitFor(CONF, 5000, () -> stopper.isStopped());
|
||||||
Threads.sleep(1000);
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupWithStopper() {
|
||||||
|
SampleStopper stopper = new SampleStopper();
|
||||||
|
DoNothingChore chore = spy(new DoNothingChore("chore", stopper, 10));
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
assertTrue(chore.isScheduled());
|
||||||
|
verify(chore, never()).cleanup();
|
||||||
|
stopper.stop("test");
|
||||||
|
Waiter.waitFor(CONF, 200, () -> !chore.isScheduled());
|
||||||
|
verify(chore, atLeastOnce()).cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupWithShutdown() {
|
||||||
|
DoNothingChore chore = spy(new DoNothingChore("chore", 10));
|
||||||
|
service.scheduleChore(chore);
|
||||||
|
assertTrue(chore.isScheduled());
|
||||||
|
verify(chore, never()).cleanup();
|
||||||
|
chore.shutdown(true);
|
||||||
|
Waiter.waitFor(CONF, 200, () -> !chore.isScheduled());
|
||||||
|
verify(chore, atLeastOnce()).cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ChoreService;
|
|
||||||
import org.apache.hadoop.hbase.ClusterId;
|
import org.apache.hadoop.hbase.ClusterId;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
|
@ -1500,11 +1499,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
try {
|
try {
|
||||||
snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
|
snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
|
||||||
if (on) {
|
if (on) {
|
||||||
if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
|
|
||||||
getChoreService().scheduleChore(this.snapshotCleanerChore);
|
getChoreService().scheduleChore(this.snapshotCleanerChore);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
getChoreService().cancelChore(this.snapshotCleanerChore);
|
this.snapshotCleanerChore.cancel();
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.error("Error updating snapshot cleanup mode to {}", on, e);
|
LOG.error("Error updating snapshot cleanup mode to {}", on, e);
|
||||||
|
@ -1528,24 +1525,23 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stopChores() {
|
private void stopChores() {
|
||||||
ChoreService choreService = getChoreService();
|
if (getChoreService() != null) {
|
||||||
if (choreService != null) {
|
shutdownChore(mobFileCleanerChore);
|
||||||
choreService.cancelChore(this.mobFileCleanerChore);
|
shutdownChore(mobFileCompactionChore);
|
||||||
choreService.cancelChore(this.mobFileCompactionChore);
|
shutdownChore(balancerChore);
|
||||||
choreService.cancelChore(this.balancerChore);
|
|
||||||
if (regionNormalizerManager != null) {
|
if (regionNormalizerManager != null) {
|
||||||
choreService.cancelChore(regionNormalizerManager.getRegionNormalizerChore());
|
shutdownChore(regionNormalizerManager.getRegionNormalizerChore());
|
||||||
}
|
}
|
||||||
choreService.cancelChore(this.clusterStatusChore);
|
shutdownChore(clusterStatusChore);
|
||||||
choreService.cancelChore(this.catalogJanitorChore);
|
shutdownChore(catalogJanitorChore);
|
||||||
choreService.cancelChore(this.clusterStatusPublisherChore);
|
shutdownChore(clusterStatusPublisherChore);
|
||||||
choreService.cancelChore(this.snapshotQuotaChore);
|
shutdownChore(snapshotQuotaChore);
|
||||||
choreService.cancelChore(this.logCleaner);
|
shutdownChore(logCleaner);
|
||||||
choreService.cancelChore(this.hfileCleaner);
|
shutdownChore(hfileCleaner);
|
||||||
choreService.cancelChore(this.replicationBarrierCleaner);
|
shutdownChore(replicationBarrierCleaner);
|
||||||
choreService.cancelChore(this.snapshotCleanerChore);
|
shutdownChore(snapshotCleanerChore);
|
||||||
choreService.cancelChore(this.hbckChore);
|
shutdownChore(hbckChore);
|
||||||
choreService.cancelChore(this.regionsRecoveryChore);
|
shutdownChore(regionsRecoveryChore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -70,7 +69,6 @@ public class RegionsRecoveryChore extends ScheduledChore {
|
||||||
*/
|
*/
|
||||||
RegionsRecoveryChore(final Stoppable stopper, final Configuration configuration,
|
RegionsRecoveryChore(final Stoppable stopper, final Configuration configuration,
|
||||||
final HMaster hMaster) {
|
final HMaster hMaster) {
|
||||||
|
|
||||||
super(REGIONS_RECOVERY_CHORE_NAME, stopper, configuration.getInt(
|
super(REGIONS_RECOVERY_CHORE_NAME, stopper, configuration.getInt(
|
||||||
HConstants.REGIONS_RECOVERY_INTERVAL, HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL));
|
HConstants.REGIONS_RECOVERY_INTERVAL, HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL));
|
||||||
this.hMaster = hMaster;
|
this.hMaster = hMaster;
|
||||||
|
@ -125,7 +123,6 @@ public class RegionsRecoveryChore extends ScheduledChore {
|
||||||
|
|
||||||
private Map<TableName, List<byte[]>> getTableToRegionsByRefCount(
|
private Map<TableName, List<byte[]>> getTableToRegionsByRefCount(
|
||||||
final Map<ServerName, ServerMetrics> serverMetricsMap) {
|
final Map<ServerName, ServerMetrics> serverMetricsMap) {
|
||||||
|
|
||||||
final Map<TableName, List<byte[]>> tableToReopenRegionsMap = new HashMap<>();
|
final Map<TableName, List<byte[]>> tableToReopenRegionsMap = new HashMap<>();
|
||||||
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
|
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
|
||||||
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
|
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
|
||||||
|
@ -146,13 +143,11 @@ public class RegionsRecoveryChore extends ScheduledChore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tableToReopenRegionsMap;
|
return tableToReopenRegionsMap;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareTableToReopenRegionsMap(
|
private void prepareTableToReopenRegionsMap(
|
||||||
final Map<TableName, List<byte[]>> tableToReopenRegionsMap,
|
final Map<TableName, List<byte[]>> tableToReopenRegionsMap,
|
||||||
final byte[] regionName, final int regionStoreRefCount) {
|
final byte[] regionName, final int regionStoreRefCount) {
|
||||||
|
|
||||||
final RegionInfo regionInfo = hMaster.getAssignmentManager().getRegionInfo(regionName);
|
final RegionInfo regionInfo = hMaster.getAssignmentManager().getRegionInfo(regionName);
|
||||||
final TableName tableName = regionInfo.getTable();
|
final TableName tableName = regionInfo.getTable();
|
||||||
if (TableName.isMetaTableName(tableName)) {
|
if (TableName.isMetaTableName(tableName)) {
|
||||||
|
@ -165,21 +160,4 @@ public class RegionsRecoveryChore extends ScheduledChore {
|
||||||
tableToReopenRegionsMap
|
tableToReopenRegionsMap
|
||||||
.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
|
.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore
|
|
||||||
// is scheduled at a time - RegionsRecoveryConfigManager
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return o != null && getClass() == o.getClass();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return 31;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ChoreService;
|
import org.apache.hadoop.hbase.ChoreService;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -27,8 +28,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Config manager for RegionsRecovery Chore - Dynamically reload config and update chore
|
* Config manager for RegionsRecovery Chore - Dynamically reload config and update chore accordingly
|
||||||
* accordingly
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RegionsRecoveryConfigManager implements ConfigurationObserver {
|
public class RegionsRecoveryConfigManager implements ConfigurationObserver {
|
||||||
|
@ -36,6 +36,7 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RegionsRecoveryConfigManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RegionsRecoveryConfigManager.class);
|
||||||
|
|
||||||
private final HMaster hMaster;
|
private final HMaster hMaster;
|
||||||
|
private RegionsRecoveryChore chore;
|
||||||
private int prevMaxStoreFileRefCount;
|
private int prevMaxStoreFileRefCount;
|
||||||
private int prevRegionsRecoveryInterval;
|
private int prevRegionsRecoveryInterval;
|
||||||
|
|
||||||
|
@ -51,34 +52,35 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver {
|
||||||
final int newMaxStoreFileRefCount = getMaxStoreFileRefCount(conf);
|
final int newMaxStoreFileRefCount = getMaxStoreFileRefCount(conf);
|
||||||
final int newRegionsRecoveryInterval = getRegionsRecoveryChoreInterval(conf);
|
final int newRegionsRecoveryInterval = getRegionsRecoveryChoreInterval(conf);
|
||||||
|
|
||||||
if (prevMaxStoreFileRefCount == newMaxStoreFileRefCount
|
if (prevMaxStoreFileRefCount == newMaxStoreFileRefCount &&
|
||||||
&& prevRegionsRecoveryInterval == newRegionsRecoveryInterval) {
|
prevRegionsRecoveryInterval == newRegionsRecoveryInterval) {
|
||||||
// no need to re-schedule the chore with updated config
|
// no need to re-schedule the chore with updated config
|
||||||
// as there is no change in desired configs
|
// as there is no change in desired configs
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Config Reload for RegionsRecovery Chore. prevMaxStoreFileRefCount: {}," +
|
LOG.info(
|
||||||
|
"Config Reload for RegionsRecovery Chore. prevMaxStoreFileRefCount: {}," +
|
||||||
" newMaxStoreFileRefCount: {}, prevRegionsRecoveryInterval: {}, " +
|
" newMaxStoreFileRefCount: {}, prevRegionsRecoveryInterval: {}, " +
|
||||||
"newRegionsRecoveryInterval: {}", prevMaxStoreFileRefCount, newMaxStoreFileRefCount,
|
"newRegionsRecoveryInterval: {}",
|
||||||
prevRegionsRecoveryInterval, newRegionsRecoveryInterval);
|
prevMaxStoreFileRefCount, newMaxStoreFileRefCount, prevRegionsRecoveryInterval,
|
||||||
|
newRegionsRecoveryInterval);
|
||||||
|
|
||||||
RegionsRecoveryChore regionsRecoveryChore = new RegionsRecoveryChore(this.hMaster,
|
RegionsRecoveryChore regionsRecoveryChore =
|
||||||
conf, this.hMaster);
|
new RegionsRecoveryChore(this.hMaster, conf, this.hMaster);
|
||||||
ChoreService choreService = this.hMaster.getChoreService();
|
ChoreService choreService = this.hMaster.getChoreService();
|
||||||
|
|
||||||
// Regions Reopen based on very high storeFileRefCount is considered enabled
|
// Regions Reopen based on very high storeFileRefCount is considered enabled
|
||||||
// only if hbase.regions.recovery.store.file.ref.count has value > 0
|
// only if hbase.regions.recovery.store.file.ref.count has value > 0
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
if (chore != null) {
|
||||||
|
chore.shutdown();
|
||||||
|
chore = null;
|
||||||
|
}
|
||||||
if (newMaxStoreFileRefCount > 0) {
|
if (newMaxStoreFileRefCount > 0) {
|
||||||
// reschedule the chore
|
// schedule the new chore
|
||||||
// provide mayInterruptIfRunning - false to take care of completion
|
|
||||||
// of in progress task if any
|
|
||||||
choreService.cancelChore(regionsRecoveryChore, false);
|
|
||||||
choreService.scheduleChore(regionsRecoveryChore);
|
choreService.scheduleChore(regionsRecoveryChore);
|
||||||
} else {
|
chore = regionsRecoveryChore;
|
||||||
choreService.cancelChore(regionsRecoveryChore, false);
|
|
||||||
}
|
}
|
||||||
this.prevMaxStoreFileRefCount = newMaxStoreFileRefCount;
|
this.prevMaxStoreFileRefCount = newMaxStoreFileRefCount;
|
||||||
this.prevRegionsRecoveryInterval = newRegionsRecoveryInterval;
|
this.prevRegionsRecoveryInterval = newRegionsRecoveryInterval;
|
||||||
|
@ -86,15 +88,18 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getMaxStoreFileRefCount(Configuration configuration) {
|
private int getMaxStoreFileRefCount(Configuration configuration) {
|
||||||
return configuration.getInt(
|
return configuration.getInt(HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
|
||||||
HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
|
|
||||||
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
|
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getRegionsRecoveryChoreInterval(Configuration configuration) {
|
private int getRegionsRecoveryChoreInterval(Configuration configuration) {
|
||||||
return configuration.getInt(
|
return configuration.getInt(HConstants.REGIONS_RECOVERY_INTERVAL,
|
||||||
HConstants.REGIONS_RECOVERY_INTERVAL,
|
|
||||||
HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL);
|
HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RestrictedApi(explanation = "Only visible for testing", link = "",
|
||||||
|
allowedOnPath = ".*/src/test/.*")
|
||||||
|
RegionsRecoveryChore getChore() {
|
||||||
|
return chore;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -936,7 +936,7 @@ public class ServerManager {
|
||||||
*/
|
*/
|
||||||
public void stop() {
|
public void stop() {
|
||||||
if (flushedSeqIdFlusher != null) {
|
if (flushedSeqIdFlusher != null) {
|
||||||
flushedSeqIdFlusher.cancel();
|
flushedSeqIdFlusher.shutdown();
|
||||||
}
|
}
|
||||||
if (persistFlushedSequenceId) {
|
if (persistFlushedSequenceId) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -456,7 +456,7 @@ public class SplitLogManager {
|
||||||
choreService.shutdown();
|
choreService.shutdown();
|
||||||
}
|
}
|
||||||
if (timeoutMonitor != null) {
|
if (timeoutMonitor != null) {
|
||||||
timeoutMonitor.cancel(true);
|
timeoutMonitor.shutdown(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class QuotaCache implements Stoppable {
|
||||||
public void stop(final String why) {
|
public void stop(final String why) {
|
||||||
if (refreshChore != null) {
|
if (refreshChore != null) {
|
||||||
LOG.debug("Stopping QuotaRefresherChore chore.");
|
LOG.debug("Stopping QuotaRefresherChore chore.");
|
||||||
refreshChore.cancel(true);
|
refreshChore.shutdown(true);
|
||||||
}
|
}
|
||||||
stopped = true;
|
stopped = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,11 +98,11 @@ public class RegionServerSpaceQuotaManager {
|
||||||
|
|
||||||
public synchronized void stop() {
|
public synchronized void stop() {
|
||||||
if (spaceQuotaRefresher != null) {
|
if (spaceQuotaRefresher != null) {
|
||||||
spaceQuotaRefresher.cancel();
|
spaceQuotaRefresher.shutdown();
|
||||||
spaceQuotaRefresher = null;
|
spaceQuotaRefresher = null;
|
||||||
}
|
}
|
||||||
if (regionSizeReporter != null) {
|
if (regionSizeReporter != null) {
|
||||||
regionSizeReporter.cancel();
|
regionSizeReporter.shutdown();
|
||||||
regionSizeReporter = null;
|
regionSizeReporter = null;
|
||||||
}
|
}
|
||||||
started = false;
|
started = false;
|
||||||
|
|
|
@ -2642,6 +2642,11 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected final void shutdownChore(ScheduledChore chore) {
|
||||||
|
if (chore != null) {
|
||||||
|
chore.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Wait on all threads to finish. Presumption is that all closes and stops
|
* Wait on all threads to finish. Presumption is that all closes and stops
|
||||||
* have already been called.
|
* have already been called.
|
||||||
|
@ -2649,15 +2654,16 @@ public class HRegionServer extends Thread implements
|
||||||
protected void stopServiceThreads() {
|
protected void stopServiceThreads() {
|
||||||
// clean up the scheduled chores
|
// clean up the scheduled chores
|
||||||
if (this.choreService != null) {
|
if (this.choreService != null) {
|
||||||
choreService.cancelChore(nonceManagerChore);
|
shutdownChore(nonceManagerChore);
|
||||||
choreService.cancelChore(compactionChecker);
|
shutdownChore(compactionChecker);
|
||||||
choreService.cancelChore(periodicFlusher);
|
shutdownChore(periodicFlusher);
|
||||||
choreService.cancelChore(healthCheckChore);
|
shutdownChore(healthCheckChore);
|
||||||
choreService.cancelChore(executorStatusChore);
|
shutdownChore(executorStatusChore);
|
||||||
choreService.cancelChore(storefileRefresher);
|
shutdownChore(storefileRefresher);
|
||||||
choreService.cancelChore(fsUtilizationChore);
|
shutdownChore(fsUtilizationChore);
|
||||||
choreService.cancelChore(slowLogTableOpsChore);
|
shutdownChore(slowLogTableOpsChore);
|
||||||
// clean up the remaining scheduled chores (in case we missed out any)
|
// cancel the remaining scheduled chores (in case we missed out any)
|
||||||
|
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
|
||||||
choreService.shutdown();
|
choreService.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -216,7 +216,7 @@ public class HeapMemoryManager {
|
||||||
public void stop() {
|
public void stop() {
|
||||||
// The thread is Daemon. Just interrupting the ongoing process.
|
// The thread is Daemon. Just interrupting the ongoing process.
|
||||||
LOG.info("Stopping");
|
LOG.info("Stopping");
|
||||||
this.heapMemTunerChore.cancel(true);
|
this.heapMemTunerChore.shutdown(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerTuneObserver(HeapMemoryTuneObserver observer) {
|
public void registerTuneObserver(HeapMemoryTuneObserver observer) {
|
||||||
|
|
|
@ -18,18 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import java.io.IOException;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -51,8 +51,6 @@ public class TestRegionsRecoveryConfigManager {
|
||||||
|
|
||||||
private HMaster hMaster;
|
private HMaster hMaster;
|
||||||
|
|
||||||
private RegionsRecoveryChore regionsRecoveryChore;
|
|
||||||
|
|
||||||
private RegionsRecoveryConfigManager regionsRecoveryConfigManager;
|
private RegionsRecoveryConfigManager regionsRecoveryConfigManager;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
@ -62,10 +60,8 @@ public class TestRegionsRecoveryConfigManager {
|
||||||
conf = HBASE_TESTING_UTILITY.getConfiguration();
|
conf = HBASE_TESTING_UTILITY.getConfiguration();
|
||||||
conf.unset("hbase.regions.recovery.store.file.ref.count");
|
conf.unset("hbase.regions.recovery.store.file.ref.count");
|
||||||
conf.unset("hbase.master.regions.recovery.check.interval");
|
conf.unset("hbase.master.regions.recovery.check.interval");
|
||||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
StartMiniClusterOption option = StartMiniClusterOption.builder().masterClass(TestHMaster.class)
|
||||||
.masterClass(TestHMaster.class)
|
.numRegionServers(1).numDataNodes(1).build();
|
||||||
.numRegionServers(1)
|
|
||||||
.numDataNodes(1).build();
|
|
||||||
HBASE_TESTING_UTILITY.startMiniCluster(option);
|
HBASE_TESTING_UTILITY.startMiniCluster(option);
|
||||||
cluster = HBASE_TESTING_UTILITY.getMiniHBaseCluster();
|
cluster = HBASE_TESTING_UTILITY.getMiniHBaseCluster();
|
||||||
}
|
}
|
||||||
|
@ -77,44 +73,44 @@ public class TestRegionsRecoveryConfigManager {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChoreSchedule() throws Exception {
|
public void testChoreSchedule() throws Exception {
|
||||||
|
|
||||||
this.hMaster = cluster.getMaster();
|
this.hMaster = cluster.getMaster();
|
||||||
|
|
||||||
Stoppable stoppable = new StoppableImplementation();
|
|
||||||
this.regionsRecoveryChore = new RegionsRecoveryChore(stoppable, conf, hMaster);
|
|
||||||
|
|
||||||
this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this.hMaster);
|
this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this.hMaster);
|
||||||
// not yet scheduled
|
// not yet scheduled
|
||||||
Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertFalse(
|
||||||
|
hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
|
|
||||||
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
||||||
// not yet scheduled
|
// not yet scheduled
|
||||||
Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertFalse(
|
||||||
|
hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
|
|
||||||
conf.setInt("hbase.master.regions.recovery.check.interval", 10);
|
conf.setInt("hbase.master.regions.recovery.check.interval", 10);
|
||||||
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
||||||
// not yet scheduled - missing config: hbase.regions.recovery.store.file.ref.count
|
// not yet scheduled - missing config: hbase.regions.recovery.store.file.ref.count
|
||||||
Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertFalse(
|
||||||
|
hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
|
|
||||||
conf.setInt("hbase.regions.recovery.store.file.ref.count", 10);
|
conf.setInt("hbase.regions.recovery.store.file.ref.count", 10);
|
||||||
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
||||||
// chore scheduled
|
// chore scheduled
|
||||||
Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
|
|
||||||
conf.setInt("hbase.regions.recovery.store.file.ref.count", 20);
|
conf.setInt("hbase.regions.recovery.store.file.ref.count", 20);
|
||||||
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
||||||
// chore re-scheduled
|
// chore re-scheduled
|
||||||
Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
|
|
||||||
conf.setInt("hbase.regions.recovery.store.file.ref.count", 20);
|
conf.setInt("hbase.regions.recovery.store.file.ref.count", 20);
|
||||||
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
||||||
// chore scheduling untouched
|
// chore scheduling untouched
|
||||||
Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
|
|
||||||
conf.unset("hbase.regions.recovery.store.file.ref.count");
|
conf.unset("hbase.regions.recovery.store.file.ref.count");
|
||||||
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
|
||||||
// chore un-scheduled
|
// chore un-scheduled
|
||||||
Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
|
assertFalse(
|
||||||
|
hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make it public so that JVMClusterUtil can access it.
|
// Make it public so that JVMClusterUtil can access it.
|
||||||
|
@ -123,24 +119,4 @@ public class TestRegionsRecoveryConfigManager {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Simple helper class that just keeps track of whether or not its stopped.
|
|
||||||
*/
|
|
||||||
private static class StoppableImplementation implements Stoppable {
|
|
||||||
|
|
||||||
private boolean stop = false;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop(String why) {
|
|
||||||
this.stop = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStopped() {
|
|
||||||
return this.stop;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,7 +111,7 @@ public class TestCatalogJanitor {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown() {
|
public void teardown() {
|
||||||
this.janitor.cancel(true);
|
this.janitor.shutdown(true);
|
||||||
this.masterServices.stop("DONE");
|
this.masterServices.stop("DONE");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue