HBASE-1062 Compactions at (re)start on a large table can overwhelm DFS
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@729560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
39b36d225c
commit
c4aa758efe
@ -190,6 +190,7 @@ Release 0.19.0 - Unreleased
|
|||||||
HBASE-1069 Show whether HRegion major compacts or not in INFO level
|
HBASE-1069 Show whether HRegion major compacts or not in INFO level
|
||||||
HBASE-1066 Master should support close/open/reassignment/enable/disable
|
HBASE-1066 Master should support close/open/reassignment/enable/disable
|
||||||
operations on individual regions
|
operations on individual regions
|
||||||
|
HBASE-1062 Compactions at (re)start on a large table can overwhelm DFS
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
||||||
|
@ -224,6 +224,12 @@
|
|||||||
Used as sleep interval by service threads such as META scanner and log roller.
|
Used as sleep interval by service threads such as META scanner and log roller.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.regionserver.safemode.period</name>
|
||||||
|
<value>120000</value>
|
||||||
|
<description>Time to wait on regionserver startup before beginning
|
||||||
|
compactions and memcache flushes.</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.hregion.memcache.flush.size</name>
|
<name>hbase.hregion.memcache.flush.size</name>
|
||||||
<value>67108864</value>
|
<value>67108864</value>
|
||||||
|
@ -60,7 +60,9 @@ class CompactSplitThread extends Thread implements HConstants {
|
|||||||
new LinkedBlockingQueue<HRegion>();
|
new LinkedBlockingQueue<HRegion>();
|
||||||
|
|
||||||
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
|
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
|
||||||
|
|
||||||
|
private volatile int limit = 1;
|
||||||
|
|
||||||
/** @param server */
|
/** @param server */
|
||||||
public CompactSplitThread(HRegionServer server) {
|
public CompactSplitThread(HRegionServer server) {
|
||||||
super();
|
super();
|
||||||
@ -73,9 +75,25 @@ class CompactSplitThread extends Thread implements HConstants {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
while (!this.server.isStopRequested() && this.server.isInSafeMode()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(this.frequency);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int count = 0;
|
||||||
while (!this.server.isStopRequested()) {
|
while (!this.server.isStopRequested()) {
|
||||||
HRegion r = null;
|
HRegion r = null;
|
||||||
try {
|
try {
|
||||||
|
if ((limit > 0) && (++count > limit)) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(this.frequency);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
count = 0;
|
||||||
|
}
|
||||||
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
||||||
if (r != null && !this.server.isStopRequested()) {
|
if (r != null && !this.server.isStopRequested()) {
|
||||||
synchronized (regionsInQueue) {
|
synchronized (regionsInQueue) {
|
||||||
@ -195,7 +213,15 @@ class CompactSplitThread extends Thread implements HConstants {
|
|||||||
|
|
||||||
// Do not serve the new regions. Let the Master assign them.
|
// Do not serve the new regions. Let the Master assign them.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the number of compactions allowed per cycle.
|
||||||
|
* @param limit the number of compactions allowed, or -1 to unlimit
|
||||||
|
*/
|
||||||
|
void setLimit(int limit) {
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only interrupt once it's done with a run through the work loop.
|
* Only interrupt once it's done with a run through the work loop.
|
||||||
*/
|
*/
|
||||||
|
@ -117,7 +117,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
|
protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
protected final AtomicBoolean quiesced = new AtomicBoolean(false);
|
protected final AtomicBoolean quiesced = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
protected final AtomicBoolean safeMode = new AtomicBoolean(true);
|
||||||
|
|
||||||
// Go down hard. Used if file system becomes unavailable and also in
|
// Go down hard. Used if file system becomes unavailable and also in
|
||||||
// debugging and unit tests.
|
// debugging and unit tests.
|
||||||
protected volatile boolean abortRequested;
|
protected volatile boolean abortRequested;
|
||||||
@ -197,6 +199,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
final LogRoller logRoller;
|
final LogRoller logRoller;
|
||||||
final LogFlusher logFlusher;
|
final LogFlusher logFlusher;
|
||||||
|
|
||||||
|
// safemode processing
|
||||||
|
SafeModeThread safeModeThread;
|
||||||
|
|
||||||
// flag set after we're done setting up server threads (used for testing)
|
// flag set after we're done setting up server threads (used for testing)
|
||||||
protected volatile boolean isOnline;
|
protected volatile boolean isOnline;
|
||||||
|
|
||||||
@ -433,8 +438,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
checkFileSystem();
|
checkFileSystem();
|
||||||
}
|
}
|
||||||
if (this.stopRequested.get()) {
|
if (this.stopRequested.get()) {
|
||||||
LOG.info("Stop was requested, clearing the toDo " +
|
LOG.info("Stop was requested, clearing the toDo " +
|
||||||
"despite of the exception");
|
"despite of the exception");
|
||||||
toDo.clear();
|
toDo.clear();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -718,6 +723,63 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
return this.fsOk;
|
return this.fsOk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread for toggling safemode after some configurable interval.
|
||||||
|
*/
|
||||||
|
private class SafeModeThread extends Thread {
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
// first, wait the required interval before turning off safemode
|
||||||
|
int safemodeInterval =
|
||||||
|
conf.getInt("hbase.regionserver.safemode.period", 120 * 1000);
|
||||||
|
try {
|
||||||
|
Thread.sleep(safemodeInterval);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// turn off safemode and limits on the way out due to some kind of
|
||||||
|
// abnormal condition so we do not prevent such things as memcache
|
||||||
|
// flushes and worsen the situation
|
||||||
|
safeMode.set(false);
|
||||||
|
compactSplitThread.setLimit(-1);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(this.getName() + " exiting on interrupt");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("leaving safe mode");
|
||||||
|
safeMode.set(false);
|
||||||
|
|
||||||
|
// now that safemode is off, slowly increase the per-cycle compaction
|
||||||
|
// limit, finally setting it to unlimited (-1)
|
||||||
|
int compactionCheckInterval =
|
||||||
|
conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
|
||||||
|
20 * 1000);
|
||||||
|
final int limitSteps[] = {
|
||||||
|
1, 1, 1, 1,
|
||||||
|
2, 2, 2, 2, 2, 2,
|
||||||
|
3, 3, 3, 3, 3, 3, 3, 3,
|
||||||
|
4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
|
||||||
|
-1
|
||||||
|
};
|
||||||
|
for (int i = 0; i < limitSteps.length; i++) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("setting compaction limit to " + limitSteps[i]);
|
||||||
|
}
|
||||||
|
compactSplitThread.setLimit(limitSteps[i]);
|
||||||
|
try {
|
||||||
|
Thread.sleep(compactionCheckInterval);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// unlimit compactions before exiting
|
||||||
|
compactSplitThread.setLimit(-1);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(this.getName() + " exiting on interrupt");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("compactions no longer limited");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Thread to shutdown the region server in an orderly manner. This thread
|
* Thread to shutdown the region server in an orderly manner. This thread
|
||||||
* is registered as a shutdown hook in the HRegionServer constructor and is
|
* is registered as a shutdown hook in the HRegionServer constructor and is
|
||||||
@ -937,6 +999,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
this.infoServer.setAttribute("regionserver", this);
|
this.infoServer.setAttribute("regionserver", this);
|
||||||
this.infoServer.start();
|
this.infoServer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set up the safe mode handler if safe mode has been configured.
|
||||||
|
if (conf.getInt("hbase.regionserver.safemode.period", 0) < 1) {
|
||||||
|
safeMode.set(false);
|
||||||
|
compactSplitThread.setLimit(-1);
|
||||||
|
LOG.debug("skipping safe mode");
|
||||||
|
} else {
|
||||||
|
this.safeModeThread = new SafeModeThread();
|
||||||
|
Threads.setDaemonThreadRunning(this.safeModeThread, n + ".safeMode",
|
||||||
|
handler);
|
||||||
|
}
|
||||||
|
|
||||||
// Start Server. This service is like leases in that it internally runs
|
// Start Server. This service is like leases in that it internally runs
|
||||||
// a thread.
|
// a thread.
|
||||||
this.server.start();
|
this.server.start();
|
||||||
@ -1304,7 +1378,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
|
|
||||||
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
|
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HRegion region = this.removeFromOnlineRegions(hri);
|
HRegion region = this.removeFromOnlineRegions(hri);
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
region.close();
|
region.close();
|
||||||
if(reportWhenCompleted) {
|
if(reportWhenCompleted) {
|
||||||
@ -1842,7 +1916,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
public boolean isStopRequested() {
|
public boolean isStopRequested() {
|
||||||
return stopRequested.get();
|
return stopRequested.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the region server is in safe mode
|
||||||
|
*/
|
||||||
|
public boolean isInSafeMode() {
|
||||||
|
return safeMode.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @return the configuration
|
* @return the configuration
|
||||||
|
@ -120,6 +120,13 @@ class MemcacheFlusher extends Thread implements FlushRequester {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
while (!this.server.isStopRequested() && this.server.isInSafeMode()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(threadWakeFrequency);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
while (!server.isStopRequested()) {
|
while (!server.isStopRequested()) {
|
||||||
HRegion r = null;
|
HRegion r = null;
|
||||||
try {
|
try {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user