HBASE-10 HRegionServer hangs upon exit due to DFSClient Exception
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@649373 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef5bb6f316
commit
5af9719de3
|
@ -10,6 +10,7 @@ Hbase Change Log
|
|||
HBASE-582 HBase 554 forgot to clear results on each iteration caused by a filter
|
||||
(Clint Morgan via Stack)
|
||||
HBASE-532 Odd interaction between HRegion.get, HRegion.deleteAll and compactions
|
||||
HBASE-10 HRegionServer hangs upon exit due to DFSClient Exception
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-559 MR example job to count table rows
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.HashSet;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -48,7 +49,7 @@ implements RegionUnavailableListener, HConstants {
|
|||
private HTable meta = null;
|
||||
private volatile long startTime;
|
||||
private final long frequency;
|
||||
private final Integer lock = new Integer(0);
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private final HRegionServer server;
|
||||
private final HBaseConfiguration conf;
|
||||
|
@ -79,12 +80,15 @@ implements RegionUnavailableListener, HConstants {
|
|||
synchronized (regionsInQueue) {
|
||||
regionsInQueue.remove(r);
|
||||
}
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
// Don't interrupt us while we are working
|
||||
Text midKey = r.compactStores();
|
||||
if (midKey != null) {
|
||||
split(r, midKey);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
|
@ -218,9 +222,9 @@ implements RegionUnavailableListener, HConstants {
|
|||
/**
|
||||
* Only interrupt once it's done with a run through the work loop.
|
||||
*/
|
||||
void interruptPolitely() {
|
||||
synchronized (lock) {
|
||||
interrupt();
|
||||
void interruptIfNecessary() {
|
||||
if (lock.tryLock()) {
|
||||
this.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
@ -48,7 +49,7 @@ class Flusher extends Thread implements CacheFlushListener {
|
|||
private final long threadWakeFrequency;
|
||||
private final long optionalFlushPeriod;
|
||||
private final HRegionServer server;
|
||||
private final Integer lock = new Integer(0);
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private final Integer memcacheSizeLock = new Integer(0);
|
||||
private long lastOptionalCheck = System.currentTimeMillis();
|
||||
|
||||
|
@ -84,7 +85,10 @@ class Flusher extends Thread implements CacheFlushListener {
|
|||
try {
|
||||
enqueueOptionalFlushRegions();
|
||||
r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
if (!flushImmediately(r)) {
|
||||
if (r == null) {
|
||||
continue;
|
||||
}
|
||||
if (!flushRegion(r, false)) {
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
|
@ -118,33 +122,52 @@ class Flusher extends Thread implements CacheFlushListener {
|
|||
/**
|
||||
* Only interrupt once it's done with a run through the work loop.
|
||||
*/
|
||||
void interruptPolitely() {
|
||||
synchronized (lock) {
|
||||
interrupt();
|
||||
void interruptIfNecessary() {
|
||||
if (lock.tryLock()) {
|
||||
this.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush a region right away, while respecting concurrency with the async
|
||||
* flushing that is always going on.
|
||||
*
|
||||
* @param region the region to be flushed
|
||||
* @param removeFromQueue true if the region needs to be removed from the
|
||||
* flush queue. False if called from the main run loop and true if called from
|
||||
* flushSomeRegions to relieve memory pressure from the region server.
|
||||
*
|
||||
* <p>In the main run loop, regions have already been removed from the flush
|
||||
* queue, and if this method is called for the relief of memory pressure,
|
||||
* this may not be necessarily true. We want to avoid trying to remove
|
||||
* region from the queue because if it has already been removed, it reqires a
|
||||
* sequential scan of the queue to determine that it is not in the queue.
|
||||
*
|
||||
* <p>If called from flushSomeRegions, the region may be in the queue but
|
||||
* it may have been determined that the region had a significant amout of
|
||||
* memory in use and needed to be flushed to relieve memory pressure. In this
|
||||
* case, its flush may preempt the pending request in the queue, and if so,
|
||||
* it needs to be removed from the queue to avoid flushing the region multiple
|
||||
* times.
|
||||
*
|
||||
* @return true if the region was successfully flushed, false otherwise. If
|
||||
* false, there will be accompanying log messages explaining why the log was
|
||||
* not flushed.
|
||||
*/
|
||||
private boolean flushImmediately(HRegion region) {
|
||||
try {
|
||||
if (region != null) {
|
||||
private boolean flushRegion(HRegion region, boolean removeFromQueue) {
|
||||
synchronized (regionsInQueue) {
|
||||
// take the region out of the set and the queue, if it happens to be
|
||||
// in the queue. this didn't used to be a constraint, but now that
|
||||
// HBASE-512 is in play, we need to try and limit double-flushing
|
||||
// regions.
|
||||
regionsInQueue.remove(region);
|
||||
// take the region out of the set. If removeFromQueue is true, remove it
|
||||
// from the queue too if it is there. This didn't used to be a constraint,
|
||||
// but now that HBASE-512 is in play, we need to try and limit
|
||||
// double-flushing of regions.
|
||||
if (regionsInQueue.remove(region) && removeFromQueue) {
|
||||
flushQueue.remove(region);
|
||||
}
|
||||
synchronized (lock) { // Don't interrupt while we're working
|
||||
lock.lock();
|
||||
try {
|
||||
if (region.flushcache()) {
|
||||
server.compactSplitThread.compactionRequested(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (DroppedSnapshotException ex) {
|
||||
// Cache flush can fail in a few places. If it fails in a critical
|
||||
// section, we get a DroppedSnapshotException and a replay of hlog
|
||||
|
@ -155,6 +178,7 @@ class Flusher extends Thread implements CacheFlushListener {
|
|||
return false;
|
||||
}
|
||||
server.stop();
|
||||
return false;
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cache flush failed" +
|
||||
(region != null ? (" for region " + region.getRegionName()) : ""),
|
||||
|
@ -162,6 +186,9 @@ class Flusher extends Thread implements CacheFlushListener {
|
|||
if (!server.checkFileSystem()) {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -223,7 +250,10 @@ class Flusher extends Thread implements CacheFlushListener {
|
|||
// flush the region with the biggest memcache
|
||||
HRegion biggestMemcacheRegion =
|
||||
sortedRegions.remove(sortedRegions.firstKey());
|
||||
flushImmediately(biggestMemcacheRegion);
|
||||
if (!flushRegion(biggestMemcacheRegion, true)) {
|
||||
// Something bad happened - give up.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Member;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
|
@ -47,7 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.dfs.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -190,7 +188,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
// eclipse warning when accessed by inner classes
|
||||
protected HLog log;
|
||||
final LogRoller logRoller;
|
||||
final Integer logRollerLock = new Integer(0);
|
||||
|
||||
// flag set after we're done setting up server threads (used for testing)
|
||||
protected volatile boolean isOnline;
|
||||
|
@ -323,7 +320,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
// this message.
|
||||
if (checkFileSystem()) {
|
||||
closeAllRegions();
|
||||
synchronized (logRollerLock) {
|
||||
try {
|
||||
log.closeAndDelete();
|
||||
} catch (Exception e) {
|
||||
|
@ -339,7 +335,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
LOG.fatal("error restarting server", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
reportForDuty(sleeper);
|
||||
restart = true;
|
||||
} else {
|
||||
|
@ -422,11 +417,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
|
||||
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
||||
// TODO: Should we check they are alive? If OOME could have exited already
|
||||
cacheFlusher.interruptPolitely();
|
||||
compactSplitThread.interruptPolitely();
|
||||
synchronized (logRollerLock) {
|
||||
this.logRoller.interrupt();
|
||||
}
|
||||
cacheFlusher.interruptIfNecessary();
|
||||
compactSplitThread.interruptIfNecessary();
|
||||
this.logRoller.interruptIfNecessary();
|
||||
|
||||
if (abortRequested) {
|
||||
if (this.fsOk) {
|
||||
|
@ -470,7 +463,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
LOG.info("stopping server at: " +
|
||||
serverInfo.getServerAddress().toString());
|
||||
}
|
||||
|
||||
join();
|
||||
LOG.info(Thread.currentThread().getName() + " exiting");
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -29,21 +31,20 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
/** Runs periodically to determine if the HLog should be rolled */
|
||||
class LogRoller extends Thread implements LogRollListener {
|
||||
static final Log LOG = LogFactory.getLog(LogRoller.class);
|
||||
private final Integer rollLock = new Integer(0);
|
||||
private final ReentrantLock rollLock = new ReentrantLock();
|
||||
private final long optionalLogRollInterval;
|
||||
private long lastLogRollTime;
|
||||
private volatile boolean rollLog;
|
||||
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
||||
private final HRegionServer server;
|
||||
private final HBaseConfiguration conf;
|
||||
|
||||
/** constructor */
|
||||
/** @param server */
|
||||
public LogRoller(final HRegionServer server) {
|
||||
super();
|
||||
this.server = server;
|
||||
conf = server.conf;
|
||||
this.optionalLogRollInterval = conf.getLong(
|
||||
"hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
|
||||
this.rollLog = false;
|
||||
lastLogRollTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
@ -51,29 +52,29 @@ class LogRoller extends Thread implements LogRollListener {
|
|||
@Override
|
||||
public void run() {
|
||||
while (!server.isStopRequested()) {
|
||||
while (!rollLog && !server.isStopRequested()) {
|
||||
while (!rollLog.get() && !server.isStopRequested()) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
|
||||
rollLog = true;
|
||||
rollLog.set(true);
|
||||
this.lastLogRollTime = now;
|
||||
} else {
|
||||
synchronized (rollLock) {
|
||||
synchronized (rollLog) {
|
||||
try {
|
||||
rollLock.wait(server.threadWakeFrequency);
|
||||
rollLog.wait(server.threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!rollLog) {
|
||||
if (!rollLog.get()) {
|
||||
// There's only two reasons to break out of the while loop.
|
||||
// 1. Log roll requested
|
||||
// 2. Stop requested
|
||||
// so if a log roll was not requested, continue and break out of loop
|
||||
continue;
|
||||
}
|
||||
synchronized (server.logRollerLock) {
|
||||
rollLock.lock(); // Don't interrupt us. We're working
|
||||
try {
|
||||
LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries());
|
||||
server.getLog().rollWriter();
|
||||
|
@ -85,17 +86,28 @@ class LogRoller extends Thread implements LogRollListener {
|
|||
LOG.error("Log rolling failed", ex);
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
rollLog = false;
|
||||
}
|
||||
rollLog.set(false);
|
||||
rollLock.unlock();
|
||||
}
|
||||
}
|
||||
LOG.info("LogRoller exiting.");
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void logRollRequested() {
|
||||
synchronized (rollLock) {
|
||||
rollLog = true;
|
||||
rollLock.notifyAll();
|
||||
synchronized (rollLog) {
|
||||
rollLog.set(true);
|
||||
rollLog.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by region server to wake up this thread if it sleeping.
|
||||
* It is sleeping if rollLock is not held.
|
||||
*/
|
||||
public void interruptIfNecessary() {
|
||||
if (rollLock.tryLock()) {
|
||||
this.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue