HBASE-2448 Scanner threads are interrupted without acquiring lock properly
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@937651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2efdd9d54f
commit
cc956afc20
|
@ -83,6 +83,15 @@ public abstract class Chore extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the thread is currently sleeping, trigger the core to happen immediately.
|
||||||
|
* If it's in the middle of its operation, will begin another operation
|
||||||
|
* immediately after finishing this one.
|
||||||
|
*/
|
||||||
|
public void triggerNow() {
|
||||||
|
this.sleeper.skipSleepCycle();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override to run a task before we start looping.
|
* Override to run a task before we start looping.
|
||||||
* @return true if initial chore was successful
|
* @return true if initial chore was successful
|
||||||
|
|
|
@ -588,9 +588,9 @@ abstract class BaseScanner extends Chore implements HConstants {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify the thread to die at the end of its next run
|
* Interrupt thread regardless of what it's doing
|
||||||
*/
|
*/
|
||||||
public void interruptIfAlive() {
|
public void interruptAndStop() {
|
||||||
synchronized(scannerLock){
|
synchronized(scannerLock){
|
||||||
if (isAlive()) {
|
if (isAlive()) {
|
||||||
super.interrupt();
|
super.interrupt();
|
||||||
|
|
|
@ -72,6 +72,6 @@ class ModifyTableMeta extends TableOperation {
|
||||||
updateRegionInfo(server, m.getRegionName(), i);
|
updateRegionInfo(server, m.getRegionName(), i);
|
||||||
}
|
}
|
||||||
// kick off a meta scan right away
|
// kick off a meta scan right away
|
||||||
master.getRegionManager().metaScannerThread.interrupt();
|
master.getRegionManager().metaScannerThread.triggerNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
||||||
master.getRegionManager().putMetaRegionOnline(m);
|
master.getRegionManager().putMetaRegionOnline(m);
|
||||||
// Interrupting the Meta Scanner sleep so that it can
|
// Interrupting the Meta Scanner sleep so that it can
|
||||||
// process regions right away
|
// process regions right away
|
||||||
master.getRegionManager().metaScannerThread.interrupt();
|
master.getRegionManager().metaScannerThread.triggerNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If updated successfully, remove from pending list if the state
|
// If updated successfully, remove from pending list if the state
|
||||||
|
|
|
@ -581,11 +581,11 @@ public class RegionManager implements HConstants {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("telling root scanner to stop");
|
LOG.debug("telling root scanner to stop");
|
||||||
}
|
}
|
||||||
rootScannerThread.interruptIfAlive();
|
rootScannerThread.interruptAndStop();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("telling meta scanner to stop");
|
LOG.debug("telling meta scanner to stop");
|
||||||
}
|
}
|
||||||
metaScannerThread.interruptIfAlive();
|
metaScannerThread.interruptAndStop();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("meta and root scanners notified");
|
LOG.debug("meta and root scanners notified");
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,9 @@ public class Sleeper {
|
||||||
private final AtomicBoolean stop;
|
private final AtomicBoolean stop;
|
||||||
private static final long MINIMAL_DELTA_FOR_LOGGING = 10000;
|
private static final long MINIMAL_DELTA_FOR_LOGGING = 10000;
|
||||||
|
|
||||||
|
private final Object sleepLock = new Object();
|
||||||
|
private boolean triggerWake = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param sleep sleep time in milliseconds
|
* @param sleep sleep time in milliseconds
|
||||||
* @param stop flag for when we stop
|
* @param stop flag for when we stop
|
||||||
|
@ -52,6 +55,17 @@ public class Sleeper {
|
||||||
sleep(System.currentTimeMillis());
|
sleep(System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If currently asleep, stops sleeping; if not asleep, will skip the next
|
||||||
|
* sleep cycle.
|
||||||
|
*/
|
||||||
|
public void skipSleepCycle() {
|
||||||
|
synchronized (sleepLock) {
|
||||||
|
triggerWake = true;
|
||||||
|
sleepLock.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sleep for period adjusted by passed <code>startTime<code>
|
* Sleep for period adjusted by passed <code>startTime<code>
|
||||||
* @param startTime Time some task started previous to now. Time to sleep
|
* @param startTime Time some task started previous to now. Time to sleep
|
||||||
|
@ -72,7 +86,10 @@ public class Sleeper {
|
||||||
while (waitTime > 0) {
|
while (waitTime > 0) {
|
||||||
long woke = -1;
|
long woke = -1;
|
||||||
try {
|
try {
|
||||||
Thread.sleep(waitTime);
|
synchronized (sleepLock) {
|
||||||
|
if (triggerWake) break;
|
||||||
|
sleepLock.wait(waitTime);
|
||||||
|
}
|
||||||
woke = System.currentTimeMillis();
|
woke = System.currentTimeMillis();
|
||||||
long slept = woke - now;
|
long slept = woke - now;
|
||||||
if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) {
|
if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) {
|
||||||
|
@ -92,5 +109,6 @@ public class Sleeper {
|
||||||
woke = (woke == -1)? System.currentTimeMillis(): woke;
|
woke = (woke == -1)? System.currentTimeMillis(): woke;
|
||||||
waitTime = this.period - (woke - startTime);
|
waitTime = this.period - (woke - startTime);
|
||||||
}
|
}
|
||||||
|
triggerWake = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue