HBASE-11403 Fix race conditions around Object#notify
This commit is contained in:
parent
e991fd8055
commit
ab72babd97
|
@ -901,6 +901,7 @@ class AsyncProcess {
|
|||
LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
|
||||
}
|
||||
synchronized (actionsInProgress) {
|
||||
if (actionsInProgress.get() == 0) break;
|
||||
actionsInProgress.wait(100);
|
||||
}
|
||||
}
|
||||
|
@ -979,6 +980,7 @@ class AsyncProcess {
|
|||
oldInProgress = currentInProgress;
|
||||
try {
|
||||
synchronized (this.tasksInProgress) {
|
||||
if (tasksInProgress.get() != oldInProgress) break;
|
||||
this.tasksInProgress.wait(100);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -1488,6 +1488,7 @@ public class RpcClient {
|
|||
}
|
||||
try {
|
||||
synchronized (call) {
|
||||
if (call.done) break;
|
||||
call.wait(Math.min(call.remainingTime(), 1000) + 1);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -334,7 +334,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
if (!successfulAddition && wait) {
|
||||
synchronized (cacheWaitSignals[queueNum]) {
|
||||
try {
|
||||
cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
|
||||
successfulAddition = bq.offer(re);
|
||||
if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
|
|
@ -543,8 +543,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
this.initializationBeforeMetaAssignment = true;
|
||||
|
||||
// Wait for regionserver to finish initialization.
|
||||
while (!isStopped() && !isOnline()) {
|
||||
synchronized (online) {
|
||||
while (!isStopped() && !isOnline()) {
|
||||
online.wait(100);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -511,7 +511,8 @@ public class ServerManager {
|
|||
long previousLogTime = 0;
|
||||
ServerName sn = master.getServerName();
|
||||
ZooKeeperWatcher zkw = master.getZooKeeper();
|
||||
while (!onlineServers.isEmpty()) {
|
||||
int onlineServersCt;
|
||||
while ((onlineServersCt = onlineServers.size()) > 0){
|
||||
|
||||
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
|
||||
Set<ServerName> remainingServers = onlineServers.keySet();
|
||||
|
@ -548,7 +549,7 @@ public class ServerManager {
|
|||
}
|
||||
synchronized (onlineServers) {
|
||||
try {
|
||||
onlineServers.wait(100);
|
||||
if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
|
||||
} catch (InterruptedException ignored) {
|
||||
// continue
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ class LogRoller extends HasThread implements WALActionsListener {
|
|||
if (!periodic) {
|
||||
synchronized (rollLog) {
|
||||
try {
|
||||
rollLog.wait(this.threadWakeFrequency);
|
||||
if (!rollLog.get()) rollLog.wait(this.threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
// Fall through
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
try {
|
||||
// Wait for 10 seconds, so that server shutdown
|
||||
// won't take too long if this thread happens to run.
|
||||
signaller.wait(10000);
|
||||
if (!signaller.get()) signaller.wait(10000);
|
||||
} catch (InterruptedException e) {
|
||||
// Go to the loop check.
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue