HBASE-10524 Correct wrong handling and add proper handling for swallowed InterruptedException thrown by Thread.sleep in regionserver (Feng Honghua)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1570219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-02-20 15:16:46 +00:00
parent 65fa5c4d82
commit 3f485568a9
2 changed files with 84 additions and 91 deletions

View File

@ -1939,46 +1939,55 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
RegionServerStatusService.BlockingInterface master = null; RegionServerStatusService.BlockingInterface master = null;
boolean refresh = false; // for the first time, use cached data boolean refresh = false; // for the first time, use cached data
RegionServerStatusService.BlockingInterface intf = null; RegionServerStatusService.BlockingInterface intf = null;
while (keepLooping() && master == null) { boolean interrupted = false;
sn = this.masterAddressManager.getMasterAddress(refresh); try {
if (sn == null) { while (keepLooping() && master == null) {
if (!keepLooping()) { sn = this.masterAddressManager.getMasterAddress(refresh);
// give up with no connection. if (sn == null) {
LOG.debug("No master found and cluster is stopped; bailing out"); if (!keepLooping()) {
return null; // give up with no connection.
LOG.debug("No master found and cluster is stopped; bailing out");
return null;
}
LOG.debug("No master found; retry");
previousLogTime = System.currentTimeMillis();
refresh = true; // let's try pull it from ZK directly
sleeper.sleep();
continue;
} }
LOG.debug("No master found; retry");
previousLogTime = System.currentTimeMillis();
refresh = true; // let's try pull it from ZK directly
sleeper.sleep();
continue;
}
new InetSocketAddress(sn.getHostname(), sn.getPort()); new InetSocketAddress(sn.getHostname(), sn.getPort());
try {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout);
intf = RegionServerStatusService.newBlockingStub(channel);
break;
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
if (e instanceof ServerNotRunningYetException) {
if (System.currentTimeMillis() > (previousLogTime+1000)){
LOG.info("Master isn't available yet, retrying");
previousLogTime = System.currentTimeMillis();
}
} else {
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
previousLogTime = System.currentTimeMillis();
}
}
try { try {
Thread.sleep(200); BlockingRpcChannel channel =
} catch (InterruptedException ignored) { this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout);
intf = RegionServerStatusService.newBlockingStub(channel);
break;
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
if (e instanceof ServerNotRunningYetException) {
if (System.currentTimeMillis() > (previousLogTime+1000)){
LOG.info("Master isn't available yet, retrying");
previousLogTime = System.currentTimeMillis();
}
} else {
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
previousLogTime = System.currentTimeMillis();
}
}
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
interrupted = true;
LOG.warn("Interrupted while sleeping");
}
} }
} }
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
} }
return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf); return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);
} }

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -190,26 +191,23 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e); LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);
} }
if (res == -1) { if (res == -1) {
try { LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create"); Thread.sleep(1000);
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
+ (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
"exiting anyway)"));
exitWorker = true;
break;
}
} }
} }
if (!exitWorker) { if (!exitWorker) {
taskLoop(); taskLoop();
} }
} catch (Throwable t) { } catch (Throwable t) {
// only a logical error can cause here. Printing it out if (ExceptionUtil.isInterrupt(t)) {
// to make debugging easier LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" :
LOG.error("unexpected error ", t); " (ERROR: exitWorker is not set, exiting anyway)"));
} else {
// only a logical error can cause here. Printing it out
// to make debugging easier
LOG.error("unexpected error ", t);
}
} finally { } finally {
LOG.info("SplitLogWorker " + this.serverName + " exiting"); LOG.info("SplitLogWorker " + this.serverName + " exiting");
} }
@ -223,7 +221,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* Synchronization using {@link #taskReadyLock} ensures that it will * Synchronization using {@link #taskReadyLock} ensures that it will
* try to grab every task that has been put up * try to grab every task that has been put up
*/ */
private void taskLoop() { private void taskLoop() throws InterruptedException {
while (!exitWorker) { while (!exitWorker) {
int seq_start = taskReadySeq; int seq_start = taskReadySeq;
List<String> paths = getTaskList(); List<String> paths = getTaskList();
@ -259,50 +257,41 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) { synchronized (taskReadyLock) {
while (seq_start == taskReadySeq) { while (seq_start == taskReadySeq) {
try { taskReadyLock.wait(checkInterval);
taskReadyLock.wait(checkInterval); if (this.server != null) {
if (this.server != null) { // check to see if we have stale recovering regions in our internal memory state
// check to see if we have stale recovering regions in our internal memory state Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions(); if (!recoveringRegions.isEmpty()) {
if (!recoveringRegions.isEmpty()) { // Make a local copy to prevent ConcurrentModificationException when other threads
// Make a local copy to prevent ConcurrentModificationException when other threads // modify recoveringRegions
// modify recoveringRegions List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet()); for (String region : tmpCopy) {
for (String region : tmpCopy) { String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region); try {
try { if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
if (ZKUtil.checkExists(this.watcher, nodePath) == -1) { HRegion r = recoveringRegions.remove(region);
HRegion r = recoveringRegions.remove(region); if (r != null) {
if (r != null) { r.setRecovering(false);
r.setRecovering(false);
}
LOG.debug("Mark recovering region:" + region + " up.");
} else {
// current check is a defensive(or redundant) mechanism to prevent us from
// having stale recovering regions in our internal RS memory state while
// zookeeper(source of truth) says differently. We stop at the first good one
// because we should not have a single instance such as this in normal case so
// check the first one is good enough.
break;
} }
} catch (KeeperException e) { LOG.debug("Mark recovering region:" + region + " up.");
// ignore zookeeper error } else {
LOG.debug("Got a zookeeper when trying to open a recovering region", e); // current check is a defensive(or redundant) mechanism to prevent us from
// having stale recovering regions in our internal RS memory state while
// zookeeper(source of truth) says differently. We stop at the first good one
// because we should not have a single instance such as this in normal case so
// check the first one is good enough.
break; break;
} }
} catch (KeeperException e) {
// ignore zookeeper error
LOG.debug("Got a zookeeper when trying to open a recovering region", e);
break;
} }
} }
} }
} catch (InterruptedException e) {
LOG.info("SplitLogWorker interrupted while waiting for task," +
" exiting: " + e.toString() + (exitWorker ? "" :
" (ERROR: exitWorker is not set, exiting anyway)"));
exitWorker = true;
return;
} }
} }
} }
} }
} }
@ -559,7 +548,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
} }
private List<String> getTaskList() { private List<String> getTaskList() throws InterruptedException {
List<String> childrenPaths = null; List<String> childrenPaths = null;
long sleepTime = 1000; long sleepTime = 1000;
// It will be in loop till it gets the list of children or // It will be in loop till it gets the list of children or
@ -575,14 +564,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
LOG.warn("Could not get children of znode " LOG.warn("Could not get children of znode "
+ this.watcher.splitLogZNode, e); + this.watcher.splitLogZNode, e);
} }
try {
LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
+ " after sleep for " + sleepTime + "ms!"); + " after sleep for " + sleepTime + "ms!");
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
} catch (InterruptedException e1) {
LOG.warn("Interrupted while trying to get task list ...", e1);
Thread.currentThread().interrupt();
}
} }
return childrenPaths; return childrenPaths;
} }