HADOOP-2731 Under load, regions become extremely large and eventually cause

region servers to become unresponsive


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk/src/contrib/hbase@617720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-02-02 00:36:53 +00:00
parent 5a033b71f4
commit 3e351091b6
10 changed files with 168 additions and 224 deletions

View File

@ -13,6 +13,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
HADOOP-2731 Under load, regions become extremely large and eventually cause
region servers to become unresponsive
IMPROVEMENTS
HADOOP-2555 Refactor the HTable#get and HTable#getRow methods to avoid

View File

@ -164,7 +164,7 @@
</property>
<property>
<name>hbase.hregion.memcache.block.multiplier</name>
<value>2</value>
<value>1</value>
<description>
Block updates if memcache has hbase.hregion.block.memcache
time hbase.hregion.flush.size bytes. Useful preventing
@ -179,7 +179,7 @@
<value>268435456</value>
<description>
Maximum HStoreFile size. If any one of a column families' HStoreFiles has
grown to exceed value + (value / 2), the hosting HRegion is split in two.
grown to exceed this value, the hosting HRegion is split in two.
Default: 256M.
</description>
</property>
@ -200,7 +200,7 @@
</property>
<property>
<name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
<value>15000</value>
<value>20000</value>
<description>How often a region server runs the split/compaction check.
</description>
</property>

View File

@ -262,9 +262,12 @@ public class HLog implements HConstants {
break;
}
}
LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
if (LOG.isDebugEnabled() && sequenceNumbers.size() > 0) {
LOG.debug("Found " + sequenceNumbers.size() +
" logs to remove " +
"using oldest outstanding seqnum of " +
oldestOutstandingSeqNum + " from region " + oldestRegion);
}
}
if (sequenceNumbers.size() > 0) {
for (Long seq : sequenceNumbers) {

View File

@ -218,8 +218,6 @@ public class HRegion implements HConstants {
volatile boolean compacting = false;
// Gets set in close. If set, cannot compact or flush again.
volatile boolean writesEnabled = true;
// Gets set in close to prevent new compaction starting
volatile boolean disableCompactions = false;
}
volatile WriteState writestate = new WriteState();
@ -319,7 +317,7 @@ public class HRegion implements HConstants {
1024*1024*64);
this.flushListener = listener;
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
@ -390,11 +388,11 @@ public class HRegion implements HConstants {
}
synchronized (splitLock) {
synchronized (writestate) {
// Can be a compaction running and it can take a long time to
// complete -- minutes. Meantime, we want flushes to keep happening
// if we are taking on lots of updates. But we don't want another
// compaction to start so set disableCompactions flag.
this.writestate.disableCompactions = true;
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
LOG.debug("compactions and cache flushes disabled for region " +
regionName);
while (writestate.compacting || writestate.flushing) {
LOG.debug("waiting for" +
(writestate.compacting ? " compaction" : "") +
@ -409,11 +407,6 @@ public class HRegion implements HConstants {
// continue
}
}
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
LOG.debug("compactions and cache flushes disabled for region " +
regionName);
}
lock.writeLock().lock();
LOG.debug("new updates and scanners for region " + regionName +
@ -671,8 +664,7 @@ public class HRegion implements HConstants {
(midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) {
return false;
}
long triggerSize = this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
boolean split = (biggest.getAggregate() >= triggerSize);
boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize);
if (split) {
if (!biggest.isSplitable()) {
LOG.warn("Region " + getRegionName().toString() +
@ -710,10 +702,6 @@ public class HRegion implements HConstants {
}
}
if (!needsCompaction) {
if (LOG.isDebugEnabled()) {
LOG.debug("region " + regionInfo.getRegionName() +
" does not need compaction");
}
return false;
}
return compactStores();
@ -768,15 +756,13 @@ public class HRegion implements HConstants {
}
try {
synchronized (writestate) {
if (!writestate.compacting && writestate.writesEnabled &&
!this.writestate.disableCompactions) {
if (!writestate.compacting && writestate.writesEnabled) {
writestate.compacting = true;
} else {
LOG.info("NOT compacting region " +
this.regionInfo.getRegionName().toString() + ": compacting=" +
writestate.compacting + ", writesEnabled=" +
writestate.writesEnabled + ", writestate.disableCompactions=" +
this.writestate.disableCompactions);
writestate.writesEnabled);
return false;
}
}

View File

@ -217,77 +217,49 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.expirationTime = expirationTime;
}
}
// Check to see if regions should be split
final Splitter splitter;
// Needed at shutdown. On way out, if can get this lock then we are not in
// middle of a split or compaction: i.e. splits/compactions cannot be
// interrupted.
final Integer splitterLock = new Integer(0);
/** Split regions on request */
class Splitter extends Thread implements RegionUnavailableListener {
private final BlockingQueue<QueueEntry> splitQueue =
new LinkedBlockingQueue<QueueEntry>();
// Compactions
final CompactSplitThread compactSplitThread;
// Needed during shutdown so we send an interrupt after completion of a
// compaction, not in the midst.
final Integer compactSplitLock = new Integer(0);
/** Compact region on request and then run split if appropriate
*/
private class CompactSplitThread extends Thread
implements RegionUnavailableListener {
private HTable root = null;
private HTable meta = null;
private long startTime;
private final long frequency;
private final BlockingQueue<QueueEntry> compactionQueue =
new LinkedBlockingQueue<QueueEntry>();
/** constructor */
public Splitter() {
public CompactSplitThread() {
super();
}
/** {@inheritDoc} */
public void closing(final Text regionName) {
startTime = System.currentTimeMillis();
lock.writeLock().lock();
try {
// Remove region from regions Map and add it to the Map of retiring
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closing (" +
"Adding to retiringRegions)");
}
} finally {
lock.writeLock().unlock();
}
this.frequency =
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
20 * 1000);
}
/** {@inheritDoc} */
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
retiringRegions.remove(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closed");
}
} finally {
lock.writeLock().unlock();
}
}
/**
* Perform region splits if necessary
*/
@Override
public void run() {
while (!stopRequested.get()) {
QueueEntry e = null;
try {
e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
synchronized (splitterLock) { // Don't interrupt us while we're working
split(e.getRegion());
}
e.getRegion().compactIfNeeded();
split(e.getRegion());
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
LOG.error("Split failed" +
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
@ -295,7 +267,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
} catch (Exception ex) {
LOG.error("Split failed" +
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
ex);
if (!checkFileSystem()) {
@ -307,18 +279,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
/**
* @param e entry indicating which region needs to be split
* @param e QueueEntry for region to be compacted
*/
public void splitRequested(QueueEntry e) {
splitQueue.add(e);
public void compactionRequested(QueueEntry e) {
compactionQueue.add(e);
}
void compactionRequested(final HRegion r) {
compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
}
private void split(final HRegion region) throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.splitRegion(this);
if (newRegions == null) {
return; // Didn't need to be split
// Didn't need to be split
return;
}
// When a region is split, the META table needs to updated if we're
@ -374,65 +350,35 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Do not serve the new regions. Let the Master assign them.
}
}
// Compactions
final Compactor compactor;
// Needed during shutdown so we send an interrupt after completion of a
// compaction, not in the midst.
final Integer compactionLock = new Integer(0);
/** Compact region on request */
class Compactor extends Thread {
private final BlockingQueue<QueueEntry> compactionQueue =
new LinkedBlockingQueue<QueueEntry>();
/** constructor */
public Compactor() {
super();
/** {@inheritDoc} */
public void closing(final Text regionName) {
startTime = System.currentTimeMillis();
lock.writeLock().lock();
try {
// Remove region from regions Map and add it to the Map of retiring
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closing (" +
"Adding to retiringRegions)");
}
} finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override
public void run() {
while (!stopRequested.get()) {
QueueEntry e = null;
try {
e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
if (e.getRegion().compactIfNeeded()) {
splitter.splitRequested(e);
}
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
ex);
if (!checkFileSystem()) {
break;
}
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
retiringRegions.remove(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closed");
}
} finally {
lock.writeLock().unlock();
}
LOG.info(getName() + " exiting");
}
/**
* @param e QueueEntry for region to be compacted
*/
public void compactionRequested(QueueEntry e) {
compactionQueue.add(e);
}
}
@ -469,7 +415,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
synchronized(cacheFlusherLock) { // Don't interrupt while we're working
if (e.getRegion().flushcache()) {
compactor.compactionRequested(e);
compactSplitThread.compactionRequested(e);
}
e.setExpirationTime(System.currentTimeMillis() +
@ -650,10 +596,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.cacheFlusher = new Flusher();
// Compaction thread
this.compactor = new Compactor();
// Region split thread
this.splitter = new Splitter();
this.compactSplitThread = new CompactSplitThread();
// Log rolling thread
this.logRoller = new LogRoller();
@ -846,11 +789,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
synchronized(cacheFlusherLock) {
this.cacheFlusher.interrupt();
}
synchronized (compactionLock) {
this.compactor.interrupt();
}
synchronized (splitterLock) {
this.splitter.interrupt();
synchronized (compactSplitLock) {
this.compactSplitThread.interrupt();
}
synchronized (logRollerLock) {
this.logRoller.interrupt();
@ -972,9 +912,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
Threads.setDaemonThreadRunning(this.compactor, n + ".compactor",
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
handler);
Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler);
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@ -1038,8 +977,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
void join() {
join(this.workerThread);
join(this.cacheFlusher);
join(this.compactor);
join(this.splitter);
join(this.compactSplitThread);
join(this.logRoller);
}
@ -1219,7 +1157,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
),
this.log, this.fs, conf, regionInfo, null, this.cacheFlusher
);
// Startup a compaction early if one is needed.
this.compactSplitThread.compactionRequested(region);
} catch (IOException e) {
LOG.error("error opening region " + regionInfo.getRegionName(), e);

View File

@ -1155,15 +1155,22 @@ public class HStore implements HConstants {
* @return True if this store needs compaction.
*/
boolean needsCompaction() {
boolean compactionNeeded = false;
return this.storefiles != null &&
(this.storefiles.size() >= this.compactionThreshold || hasReferences());
}
/*
* @return True if this store has references.
*/
private boolean hasReferences() {
if (this.storefiles != null) {
compactionNeeded = this.storefiles.size() >= this.compactionThreshold;
if (LOG.isDebugEnabled()) {
LOG.debug("compaction for HStore " + storeName +
(compactionNeeded ? " " : " not ") + "needed.");
for (HStoreFile hsf: this.storefiles.values()) {
if (hsf.isReference()) {
return true;
}
}
}
return compactionNeeded;
return false;
}
/**

View File

@ -68,13 +68,13 @@ public class HStoreKey implements WritableComparable {
/**
* Create an HStoreKey specifying the row and column names
* The timestamp defaults to Long.MAX_VALUE
* The timestamp defaults to LATEST_TIMESTAMP
*
* @param row row key
* @param column column key
*/
public HStoreKey(Text row, Text column) {
this(row, column, Long.MAX_VALUE);
this(row, column, HConstants.LATEST_TIMESTAMP);
}
/**

View File

@ -74,6 +74,9 @@ public class MultiRegionTable extends HBaseTestCase {
HRegionInfo parent =
t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo();
LOG.info("Parent region " + parent.toString());
Path parentDir = HRegion.getRegionDir(new Path(d, tableName),
parent.getEncodedName());
assertTrue(fs.exists(parentDir));
// Now add content.
addContent(new HTableIncommon(t), columnName);
LOG.info("Finished content loading");
@ -125,72 +128,73 @@ public class MultiRegionTable extends HBaseTestCase {
// region we have been dealing with up to this. Its the parent of the
// region split.
Map<Text, byte []> data = getSplitParentInfo(meta, parent);
parent = Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO));
LOG.info("Found parent region: " + parent);
assertTrue(parent.isOffline());
assertTrue(parent.isSplit());
HRegionInfo splitA =
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA));
HRegionInfo splitB =
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB));
Path parentDir = HRegion.getRegionDir(new Path(d, tableName),
parent.getEncodedName());
assertTrue(fs.exists(parentDir));
LOG.info("Split happened. Parent is " + parent.getRegionName());
// Recalibrate will cause us to wait on new regions' deployment
recalibrate(t, new Text(columnName), retries, waitTime);
if (splitA == null) {
LOG.info("splitA was already null. Assuming it was previously compacted.");
if (data == null) {
// We changed stuff so daughters get cleaned up much faster now. Can
// run so fast, parent has been deleted by time we get to here.
} else {
LOG.info("Daughter splitA: " + splitA.getRegionName());
// Compact a region at a time so we can test case where one region has
// no references but the other still has some
compact(cluster, splitA);
// Wait till the parent only has reference to remaining split, one that
// still has references.
while (true) {
data = getSplitParentInfo(meta, parent);
if (data == null || data.size() == 3) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// continue
parent = Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO));
LOG.info("Found parent region: " + parent);
assertTrue(parent.isOffline());
assertTrue(parent.isSplit());
HRegionInfo splitA =
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA));
HRegionInfo splitB =
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB));
assertTrue(fs.exists(parentDir));
LOG.info("Split happened. Parent is " + parent.getRegionName());
// Recalibrate will cause us to wait on new regions' deployment
recalibrate(t, new Text(columnName), retries, waitTime);
if (splitA == null) {
LOG.info("splitA was already null. Assuming it was previously compacted.");
} else {
LOG.info("Daughter splitA: " + splitA.getRegionName());
// Compact a region at a time so we can test case where one region has
// no references but the other still has some
compact(cluster, splitA);
// Wait till the parent only has reference to remaining split, one that
// still has references.
while (true) {
data = getSplitParentInfo(meta, parent);
if (data == null || data.size() == 3) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// continue
}
continue;
}
continue;
break;
}
break;
LOG.info("Parent split info returned " + data.keySet().toString());
}
LOG.info("Parent split info returned " + data.keySet().toString());
if (splitB == null) {
LOG.info("splitB was already null. Assuming it was previously compacted.");
} else {
LOG.info("Daughter splitB: " + splitA.getRegionName());
// Call second split.
compact(cluster, splitB);
}
// Now wait until parent disappears.
LOG.info("Waiting on parent " + parent.getRegionName() + " to disappear");
for (int i = 0; i < retries; i++) {
if (getSplitParentInfo(meta, parent) == null) {
break;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// continue
}
}
assertNull(getSplitParentInfo(meta, parent));
}
if (splitB == null) {
LOG.info("splitB was already null. Assuming it was previously compacted.");
} else {
LOG.info("Daughter splitB: " + splitA.getRegionName());
// Call second split.
compact(cluster, splitB);
}
// Now wait until parent disappears.
LOG.info("Waiting on parent " + parent.getRegionName() + " to disappear");
for (int i = 0; i < retries; i++) {
if (getSplitParentInfo(meta, parent) == null) {
break;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// continue
}
}
assertNull(getSplitParentInfo(meta, parent));
// Assert cleaned up.
for (int i = 0; i < retries; i++) {

View File

@ -51,6 +51,7 @@ public class TestCompaction extends HBaseTestCase {
// Set cache flush size to 1MB
conf.setInt("hbase.hregion.memcache.flush.size", 1024*1024);
conf.setInt("hbase.hregion.memcache.block.multiplier", 2);
this.cluster = null;
}

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiSearcher;
@ -83,16 +85,16 @@ public class TestTableIndex extends MultiRegionTable {
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
// Enable DEBUG-level MR logging.
Logger.getLogger("org.apache.hadoop.mapred").setLevel(Level.DEBUG);
// Make sure the cache gets flushed so we trigger a compaction(s) and
// hence splits.
conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024);
// Always compact if there is more than one store file.
conf.setInt("hbase.hstore.compactionThreshold", 2);
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 128 * 1024);
conf.setLong("hbase.hregion.max.filesize", 1024 * 1024);
desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));