HBASE-10516 Refactor code where Threads.sleep is called within a while/for loop (Feng Honghua)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1570524 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-02-21 10:56:47 +00:00
parent 7378dd8521
commit b0c77d1d7e
7 changed files with 101 additions and 50 deletions

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@ -372,9 +371,16 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
long finished = System.currentTimeMillis() +
this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
while (System.currentTimeMillis() < finished) {
Threads.sleep(1);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
throw new RuntimeException("Interrupted while waiting for" +
" recoverableZooKeeper is set");
}
if (this.recoverableZooKeeper != null) break;
}
if (this.recoverableZooKeeper == null) {
LOG.error("ZK is null on connection event -- see stack trace " +
"for the stack trace when constructor was called on this zkw",

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -869,8 +868,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
victimHandler.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}
if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);

View File

@ -1449,7 +1449,8 @@ public class AssignmentManager extends ZooKeeperListener {
* @param regions Regions to assign.
* @return true if successful
*/
boolean assign(final ServerName destination, final List<HRegionInfo> regions) {
boolean assign(final ServerName destination, final List<HRegionInfo> regions)
throws InterruptedException {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
try {
int regionCount = regions.size();
@ -1512,7 +1513,7 @@ public class AssignmentManager extends ZooKeeperListener {
oldCounter = count;
}
if (count >= total) break;
Threads.sleep(5);
Thread.sleep(5);
}
if (server.isStopped()) {
@ -1615,8 +1616,6 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("Unable to communicate with " + destination
+ " in order to assign regions, ", e);
return false;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
for (Lock lock : locks.values()) {

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.util.List;
@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
@ -78,7 +78,12 @@ public class DeleteTableHandler extends TableEventHandler {
am.regionOffline(region);
}
if (!states.isRegionInTransition(region)) break;
Threads.sleep(waitingTimeForEvents);
try {
Thread.sleep(waitingTimeForEvents);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
LOG.debug("Waiting on region to clear regions in transition; "
+ am.getRegionStates().getRegionTransitionState(region));
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -911,7 +912,11 @@ public class HRegionFileSystem {
} catch (IOException ioe) {
lastIOE = ioe;
if (fs.exists(dir)) return true; // directory is present
sleepBeforeRetry("Create Directory", i+1);
try {
sleepBeforeRetry("Create Directory", i+1);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
} while (++i <= hdfsClientRetriesNumber);
throw new IOException("Exception in createDir", lastIOE);
@ -934,9 +939,14 @@ public class HRegionFileSystem {
lastIOE = ioe;
if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move
// dir is not there, retry after some time.
sleepBeforeRetry("Rename Directory", i+1);
try {
sleepBeforeRetry("Rename Directory", i+1);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
} while (++i <= hdfsClientRetriesNumber);
throw new IOException("Exception in rename", lastIOE);
}
@ -956,16 +966,21 @@ public class HRegionFileSystem {
lastIOE = ioe;
if (!fs.exists(dir)) return true;
// dir is there, retry deleting after some time.
sleepBeforeRetry("Delete Directory", i+1);
try {
sleepBeforeRetry("Delete Directory", i+1);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
} while (++i <= hdfsClientRetriesNumber);
throw new IOException("Exception in DeleteDir", lastIOE);
}
/**
* sleeping logic; handles the interrupt exception.
*/
private void sleepBeforeRetry(String msg, int sleepMultiplier) {
private void sleepBeforeRetry(String msg, int sleepMultiplier) throws InterruptedException {
sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries, hdfsClientRetriesNumber);
}
@ -993,9 +1008,14 @@ public class HRegionFileSystem {
} catch (IOException ioe) {
lastIOE = ioe;
if (fs.exists(dir)) return true; // directory is present
sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries, hdfsClientRetriesNumber);
try {
sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries, hdfsClientRetriesNumber);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
} while (++i <= hdfsClientRetriesNumber);
throw new IOException("Exception in createDir", lastIOE);
}
@ -1004,12 +1024,12 @@ public class HRegionFileSystem {
* for this to avoid re-looking for the integer values.
*/
private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSleepBeforeRetries,
int hdfsClientRetriesNumber) {
int hdfsClientRetriesNumber) throws InterruptedException {
if (sleepMultiplier > hdfsClientRetriesNumber) {
LOG.debug(msg + ", retries exhausted");
return;
}
LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier);
Threads.sleep((long)baseSleepBeforeRetries * sleepMultiplier);
Thread.sleep((long)baseSleepBeforeRetries * sleepMultiplier);
}
}

View File

@ -1127,43 +1127,55 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
int lastCount = -1;
long previousLogTime = 0;
Set<String> closedRegions = new HashSet<String>();
while (!isOnlineRegionsEmpty()) {
int count = getNumberOfOnlineRegions();
// Only print a message if the count of regions has changed.
if (count != lastCount) {
// Log every second at most
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
previousLogTime = System.currentTimeMillis();
lastCount = count;
LOG.info("Waiting on " + count + " regions to close");
// Only print out regions still closing if a small number else will
// swamp the log.
if (count < 10 && LOG.isDebugEnabled()) {
LOG.debug(this.onlineRegions);
boolean interrupted = false;
try {
while (!isOnlineRegionsEmpty()) {
int count = getNumberOfOnlineRegions();
// Only print a message if the count of regions has changed.
if (count != lastCount) {
// Log every second at most
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
previousLogTime = System.currentTimeMillis();
lastCount = count;
LOG.info("Waiting on " + count + " regions to close");
// Only print out regions still closing if a small number else will
// swamp the log.
if (count < 10 && LOG.isDebugEnabled()) {
LOG.debug(this.onlineRegions);
}
}
}
}
// Ensure all user regions have been sent a close. Use this to
// protect against the case where an open comes in after we start the
// iterator of onlineRegions to close all user regions.
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
HRegionInfo hri = e.getValue().getRegionInfo();
if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
&& !closedRegions.contains(hri.getEncodedName())) {
closedRegions.add(hri.getEncodedName());
// Don't update zk with this close transition; pass false.
closeRegionIgnoreErrors(hri, abort);
// Ensure all user regions have been sent a close. Use this to
// protect against the case where an open comes in after we start the
// iterator of onlineRegions to close all user regions.
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
HRegionInfo hri = e.getValue().getRegionInfo();
if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
&& !closedRegions.contains(hri.getEncodedName())) {
closedRegions.add(hri.getEncodedName());
// Don't update zk with this close transition; pass false.
closeRegionIgnoreErrors(hri, abort);
}
}
// No regions in RIT, we could stop waiting now.
if (this.regionsInTransitionInRS.isEmpty()) {
if (!isOnlineRegionsEmpty()) {
LOG.info("We were exiting though online regions are not empty," +
" because some regions failed closing");
}
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
interrupted = true;
LOG.warn("Interrupted while sleeping");
}
}
// No regions in RIT, we could stop waiting now.
if (this.regionsInTransitionInRS.isEmpty()) {
if (!isOnlineRegionsEmpty()) {
LOG.info("We were exiting though online regions are not empty," +
" because some regions failed closing");
}
break;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
Threads.sleep(200);
}
}

View File

@ -1188,7 +1188,8 @@ public class TestAssignmentManager {
}
@Override
boolean assign(ServerName destination, List<HRegionInfo> regions) {
boolean assign(ServerName destination, List<HRegionInfo> regions)
throws InterruptedException {
if (enabling) {
for (HRegionInfo region : regions) {
assignmentCount++;