HADOOP-2722 Prevent unintentional thread exit in region server and master
git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk/src/contrib/hbase@616298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2f06f08d95
commit
d929e0c34a
|
@ -151,6 +151,7 @@ Trunk (unreleased changes)
|
|||
HADOOP-2706 HBase Shell crash
|
||||
HADOOP-2712 under load, regions won't split
|
||||
HADOOP-2675 Options not passed to rest/thrift
|
||||
HADOOP-2722 Prevent unintentional thread exit in region server and master
|
||||
|
||||
IMPROVEMENTS
|
||||
HADOOP-2401 Add convenience put method that takes writable
|
||||
|
|
|
@ -58,10 +58,18 @@ public abstract class Chore extends Thread {
|
|||
}
|
||||
this.sleeper.sleep();
|
||||
while(!this.stop.get()) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
chore();
|
||||
this.sleeper.sleep(startTime);
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
chore();
|
||||
this.sleeper.sleep(startTime);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Caught exception", e);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Caught error. Starting shutdown.", t);
|
||||
this.stop.set(true);
|
||||
|
||||
} finally {
|
||||
LOG.info(getName() + " exiting");
|
||||
}
|
||||
|
|
|
@ -1094,11 +1094,14 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into toDoQueue was interrupted.", e);
|
||||
} catch (Exception e) {
|
||||
LOG.error("main processing loop: " + op.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Unhandled exception", t);
|
||||
LOG.fatal("Unhandled exception. Starting shutdown.", t);
|
||||
this.closed.set(true);
|
||||
}
|
||||
// The region servers won't all exit until we stop scanning the meta regions
|
||||
stopScanners();
|
||||
|
|
|
@ -152,7 +152,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
public void run() {
|
||||
LOG.info("Starting shutdown thread.");
|
||||
|
||||
// tell the region server to stop and wait for it to complete
|
||||
|
@ -278,31 +278,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
QueueEntry e = null;
|
||||
try {
|
||||
e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (e == null) {
|
||||
continue;
|
||||
}
|
||||
synchronized (splitterLock) { // Don't interrupt us while we're working
|
||||
split(e.getRegion());
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
continue;
|
||||
}
|
||||
if (e == null) {
|
||||
continue;
|
||||
}
|
||||
synchronized (splitterLock) { // Don't interrupt us while we're working
|
||||
try {
|
||||
split(e.getRegion());
|
||||
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Split failed for region " +
|
||||
e.getRegion().getRegionName(),
|
||||
RemoteExceptionHandler.checkIOException(ex));
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Split failed" +
|
||||
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
||||
RemoteExceptionHandler.checkIOException(ex));
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Split failed on region " +
|
||||
e.getRegion().getRegionName(), ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Split failed" +
|
||||
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
||||
ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -402,29 +399,27 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
QueueEntry e = null;
|
||||
try {
|
||||
e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
|
||||
} catch (InterruptedException ex) {
|
||||
continue;
|
||||
}
|
||||
if (e == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (e == null) {
|
||||
continue;
|
||||
}
|
||||
if (e.getRegion().compactIfNeeded()) {
|
||||
splitter.splitRequested(e);
|
||||
}
|
||||
|
||||
} catch (InterruptedException ex) {
|
||||
continue;
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Compaction failed for region " +
|
||||
e.getRegion().getRegionName(),
|
||||
LOG.error("Compaction failed" +
|
||||
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
||||
RemoteExceptionHandler.checkIOException(ex));
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Compaction failed for region " +
|
||||
e.getRegion().getRegionName(), ex);
|
||||
LOG.error("Compaction failed" +
|
||||
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
||||
ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
|
@ -469,47 +464,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
QueueEntry e = null;
|
||||
try {
|
||||
e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
|
||||
} catch (InterruptedException ex) {
|
||||
continue;
|
||||
|
||||
} catch (ConcurrentModificationException ex) {
|
||||
continue;
|
||||
|
||||
}
|
||||
synchronized(cacheFlusherLock) { // Don't interrupt while we're working
|
||||
if (e != null) {
|
||||
try {
|
||||
if (e.getRegion().flushcache()) {
|
||||
compactor.compactionRequested(e);
|
||||
}
|
||||
|
||||
} catch (DroppedSnapshotException ex) {
|
||||
// Cache flush can fail in a few places. If it fails in a critical
|
||||
// section, we get a DroppedSnapshotException and a replay of hlog
|
||||
// is required. Currently the only way to do this is a restart of
|
||||
// the server.
|
||||
LOG.fatal("Replay of hlog required. Forcing server restart", ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
HRegionServer.this.stop();
|
||||
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cache flush failed for region " +
|
||||
e.getRegion().getRegionName(),
|
||||
RemoteExceptionHandler.checkIOException(ex));
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cache flush failed for region " +
|
||||
e.getRegion().getRegionName(), ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
if (e == null) {
|
||||
continue;
|
||||
}
|
||||
synchronized(cacheFlusherLock) { // Don't interrupt while we're working
|
||||
if (e.getRegion().flushcache()) {
|
||||
compactor.compactionRequested(e);
|
||||
}
|
||||
|
||||
e.setExpirationTime(System.currentTimeMillis() +
|
||||
optionalFlushPeriod);
|
||||
flushQueue.add(e);
|
||||
|
@ -537,6 +499,38 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
continue;
|
||||
|
||||
} catch (ConcurrentModificationException ex) {
|
||||
continue;
|
||||
|
||||
} catch (DroppedSnapshotException ex) {
|
||||
// Cache flush can fail in a few places. If it fails in a critical
|
||||
// section, we get a DroppedSnapshotException and a replay of hlog
|
||||
// is required. Currently the only way to do this is a restart of
|
||||
// the server.
|
||||
LOG.fatal("Replay of hlog required. Forcing server restart", ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
HRegionServer.this.stop();
|
||||
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cache flush failed" +
|
||||
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
||||
RemoteExceptionHandler.checkIOException(ex));
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cache flush failed" +
|
||||
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
||||
ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
flushQueue.clear();
|
||||
|
@ -811,13 +805,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
// Reset tries count if we had a successful transaction.
|
||||
tries = 0;
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IOException) {
|
||||
e = RemoteExceptionHandler.checkIOException((IOException) e);
|
||||
}
|
||||
if(tries < this.numRetries) {
|
||||
LOG.warn("Processing message (Retry: " + tries + ")", e);
|
||||
tries++;
|
||||
} else {
|
||||
LOG.error("Exceeded max retries: " + this.numRetries, e);
|
||||
LOG.fatal("Exceeded max retries: " + this.numRetries, e);
|
||||
if (!checkFileSystem()) {
|
||||
continue;
|
||||
}
|
||||
|
@ -826,10 +822,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.sleeper.sleep(lastMsg);
|
||||
} // while (!stopRequested.get())
|
||||
}
|
||||
} // for
|
||||
} // while (!stopRequested.get())
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Unhandled exception. Aborting...", t);
|
||||
abort();
|
||||
|
@ -1148,19 +1143,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
/** {@inheritDoc} */
|
||||
public void run() {
|
||||
try {
|
||||
for(ToDoEntry e = null; !stopRequested.get(); ) {
|
||||
while(!stopRequested.get()) {
|
||||
ToDoEntry e = null;
|
||||
try {
|
||||
e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ex) {
|
||||
// continue
|
||||
}
|
||||
if(e == null || stopRequested.get()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if(e == null || stopRequested.get()) {
|
||||
continue;
|
||||
}
|
||||
LOG.info(e.msg.toString());
|
||||
switch(e.msg.getMsg()) {
|
||||
|
||||
|
||||
case HMsg.MSG_REGIONSERVER_QUIESCE:
|
||||
closeUserRegions();
|
||||
break;
|
||||
|
@ -1185,19 +1177,24 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
"Impossible state during msg processing. Instruction: "
|
||||
+ e.msg.toString());
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
ie = RemoteExceptionHandler.checkIOException(ie);
|
||||
if(e.tries < numRetries) {
|
||||
LOG.warn(ie);
|
||||
} catch (InterruptedException ex) {
|
||||
// continue
|
||||
} catch (Exception ex) {
|
||||
if (ex instanceof IOException) {
|
||||
ex = RemoteExceptionHandler.checkIOException((IOException) ex);
|
||||
}
|
||||
if(e != null && e.tries < numRetries) {
|
||||
LOG.warn(ex);
|
||||
e.tries++;
|
||||
try {
|
||||
toDo.put(e);
|
||||
} catch (InterruptedException ex) {
|
||||
} catch (InterruptedException ie) {
|
||||
throw new RuntimeException("Putting into msgQueue was " +
|
||||
"interrupted.", ex);
|
||||
"interrupted.", ex);
|
||||
}
|
||||
} else {
|
||||
LOG.error("unable to process message: " + e.msg.toString(), ie);
|
||||
LOG.error("unable to process message" +
|
||||
(e != null ? (": " + e.msg.toString()) : ""), ex);
|
||||
if (!checkFileSystem()) {
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue