HBASE-512 Add configuration for global aggregate memcache size

-Refactored Flusher slightly, added reclaimMemcacheMemory method
-HRegionServer calls reclaimMemcacheMemory during batchUpdates
-Added TestGlobalMemcacheLimit to verify new functionality 
-Added new config parameter defaults to hbase-default.xml

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@645740 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bryan Duxbury 2008-04-08 00:09:20 +00:00
parent 7813444a9b
commit 49e0eaf8d9
7 changed files with 318 additions and 45 deletions

View File

@ -18,6 +18,7 @@ Hbase Change Log
NEW FEATURES NEW FEATURES
HBASE-548 Tool to online single region HBASE-548 Tool to online single region
HBASE-71 Master should rebalance region assignments periodically HBASE-71 Master should rebalance region assignments periodically
HBASE-512 Add configuration for global aggregate memcache size
IMPROVEMENTS IMPROVEMENTS
HBASE-469 Streamline HStore startup and compactions HBASE-469 Streamline HStore startup and compactions

View File

@ -261,4 +261,21 @@
<description>TableFormatter to use outputting HQL result sets. <description>TableFormatter to use outputting HQL result sets.
</description> </description>
</property> </property>
<property>
<name>hbase.regionserver.globalMemcacheLimit</name>
<value>536870912</value>
<description>Maximum size of all memcaches in a region server before new
updates are blocked and flushes are forced. Defaults to 512MB.
</description>
</property>
<property>
<name>hbase.regionserver.globalMemcacheLimitlowMark</name>
<value>256435456</value>
<description>When memcaches are being forced to flush to make room in
memory, keep flushing until we hit this mark. Defaults to 256MB. Setting
this value equal to hbase.regionserver.globalmemcachelimit causes the
minimum possible flushing to occur when updates are blocked due to
memcache limiting.
</description>
</property>
</configuration> </configuration>

View File

@ -118,6 +118,16 @@ public class LocalHBaseCluster implements HConstants {
} }
} }
/**
* @param serverNumber
* @return region server
*/
public HRegionServer getRegionServer(int serverNumber) {
synchronized (regionThreads) {
return regionThreads.get(serverNumber).getRegionServer();
}
}
/** runs region servers */ /** runs region servers */
public static class RegionServerThread extends Thread { public static class RegionServerThread extends Thread {
private final HRegionServer regionServer; private final HRegionServer regionServer;

View File

@ -25,6 +25,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Comparator;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -46,6 +49,11 @@ class Flusher extends Thread implements CacheFlushListener {
private final long optionalFlushPeriod; private final long optionalFlushPeriod;
private final HRegionServer server; private final HRegionServer server;
private final Integer lock = new Integer(0); private final Integer lock = new Integer(0);
private final Integer memcacheSizeLock = new Integer(0);
private long lastOptionalCheck = System.currentTimeMillis();
protected final long globalMemcacheLimit;
protected final long globalMemcacheLimitLowMark;
/** /**
* @param conf * @param conf
@ -54,67 +62,35 @@ class Flusher extends Thread implements CacheFlushListener {
public Flusher(final HBaseConfiguration conf, final HRegionServer server) { public Flusher(final HBaseConfiguration conf, final HRegionServer server) {
super(); super();
this.server = server; this.server = server;
this.optionalFlushPeriod = conf.getLong( optionalFlushPeriod = conf.getLong(
"hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
this.threadWakeFrequency = conf.getLong( threadWakeFrequency = conf.getLong(
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
// default memcache limit of 512MB
globalMemcacheLimit =
conf.getLong("hbase.regionserver.globalMemcacheLimit", 512 * 1024 * 1024);
// default memcache low mark limit of 256MB, which is half the upper limit
globalMemcacheLimitLowMark =
conf.getLong("hbase.regionserver.globalMemcacheLimitLowMark",
globalMemcacheLimit / 2);
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public void run() { public void run() {
long lastOptionalCheck = System.currentTimeMillis();
while (!server.isStopRequested()) { while (!server.isStopRequested()) {
HRegion r = null; HRegion r = null;
try { try {
long now = System.currentTimeMillis(); enqueueOptionalFlushRegions();
if (now - threadWakeFrequency > lastOptionalCheck) {
lastOptionalCheck = now;
// Queue up regions for optional flush if they need it
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion region: regions) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(region) &&
(now - optionalFlushPeriod) > region.getLastFlushTime()) {
regionsInQueue.add(region);
flushQueue.add(region);
region.setLastFlushTime(now);
}
}
}
}
r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (r != null) { if (!flushImmediately(r)) {
synchronized (regionsInQueue) { break;
regionsInQueue.remove(r);
}
synchronized (lock) { // Don't interrupt while we're working
if (r.flushcache()) {
server.compactSplitThread.compactionRequested(r);
}
}
} }
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
continue; continue;
} catch (ConcurrentModificationException ex) { } catch (ConcurrentModificationException ex) {
continue; continue;
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
// is required. Currently the only way to do this is a restart of
// the server.
LOG.fatal("Replay of hlog required. Forcing server restart", ex);
if (!server.checkFileSystem()) {
break;
}
server.stop();
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(r != null ? (" for region " + r.getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
}
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Cache flush failed" + LOG.error("Cache flush failed" +
(r != null ? (" for region " + r.getRegionName()) : ""), (r != null ? (" for region " + r.getRegionName()) : ""),
@ -147,4 +123,108 @@ class Flusher extends Thread implements CacheFlushListener {
interrupt(); interrupt();
} }
} }
/**
* Flush a region right away, while respecting concurrency with the async
* flushing that is always going on.
*/
private boolean flushImmediately(HRegion region) {
try {
if (region != null) {
synchronized (regionsInQueue) {
// take the region out of the set and the queue, if it happens to be
// in the queue. this didn't used to be a constraint, but now that
// HBASE-512 is in play, we need to try and limit double-flushing
// regions.
regionsInQueue.remove(region);
flushQueue.remove(region);
}
synchronized (lock) { // Don't interrupt while we're working
if (region.flushcache()) {
server.compactSplitThread.compactionRequested(region);
}
}
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
// is required. Currently the only way to do this is a restart of
// the server.
LOG.fatal("Replay of hlog required. Forcing server restart", ex);
if (!server.checkFileSystem()) {
return false;
}
server.stop();
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(region != null ? (" for region " + region.getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
return false;
}
}
return true;
}
/**
* Find the regions that should be optionally flushed and put them on the
* flush queue.
*/
private void enqueueOptionalFlushRegions() {
long now = System.currentTimeMillis();
if (now - threadWakeFrequency > lastOptionalCheck) {
lastOptionalCheck = now;
// Queue up regions for optional flush if they need it
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion region: regions) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(region) &&
(now - optionalFlushPeriod) > region.getLastFlushTime()) {
regionsInQueue.add(region);
flushQueue.add(region);
region.setLastFlushTime(now);
}
}
}
}
}
/**
* Check if the regionserver's memcache memory usage is greater than the
* limit. If so, flush regions with the biggest memcaches until we're down
* to the lower limit. This method blocks callers until we're down to a safe
* amount of memcache consumption.
*/
public void reclaimMemcacheMemory() {
synchronized (memcacheSizeLock) {
if (server.getGlobalMemcacheSize() >= globalMemcacheLimit) {
flushSomeRegions();
}
}
}
private void flushSomeRegions() {
// we'll sort the regions in reverse
SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
new Comparator<Long>() {
public int compare(Long a, Long b) {
return -1 * a.compareTo(b);
}
}
);
// copy over all the regions
for (HRegion region : server.onlineRegions.values()) {
sortedRegions.put(region.memcacheSize.get(), region);
}
// keep flushing until we hit the low water mark
while (server.getGlobalMemcacheSize() >= globalMemcacheLimitLowMark) {
// flush the region with the biggest memcache
HRegion biggestMemcacheRegion =
sortedRegions.remove(sortedRegions.firstKey());
flushImmediately(biggestMemcacheRegion);
}
}
} }

View File

@ -1098,6 +1098,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.requestCount.incrementAndGet(); this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
try { try {
cacheFlusher.reclaimMemcacheMemory();
region.batchUpdate(b); region.batchUpdate(b);
} catch (IOException e) { } catch (IOException e) {
checkFileSystem(); checkFileSystem();
@ -1405,6 +1406,20 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return this.outboundMsgs; return this.outboundMsgs;
} }
/**
* Return the total size of all memcaches in every region.
* @return memcache size in bytes
*/
public long getGlobalMemcacheSize() {
long total = 0;
synchronized (onlineRegions) {
for (HRegion region : onlineRegions.values()) {
total += region.memcacheSize.get();
}
}
return total;
}
// //
// Main program and support routines // Main program and support routines
// //

View File

@ -161,4 +161,13 @@ public class MiniHBaseCluster implements HConstants {
public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() { public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
return this.hbaseCluster.getRegionServers(); return this.hbaseCluster.getRegionServers();
} }
/**
* Grab a numbered region server of your choice.
* @param serverNumber
* @return region server
*/
public HRegionServer getRegionServer(int serverNumber) {
return hbaseCluster.getRegionServer(serverNumber);
}
} }

View File

@ -0,0 +1,141 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Test setting the global memcache size for a region server. When it reaches
* this size, any puts should be blocked while one or more forced flushes occurs
* to bring the memcache size back down.
*/
public class TestGlobalMemcacheLimit extends HBaseClusterTestCase {
final byte[] ONE_KB = new byte[1024];
HTable table1;
HTable table2;
HRegionServer server;
long keySize = (new Text(COLFAMILY_NAME1)).getLength() + 9 + 8;
long rowSize = keySize + ONE_KB.length;
/**
* Get our hands into the cluster configuration before the hbase cluster
* starts up.
*/
@Override
public void preHBaseClusterSetup() {
// we'll use a 2MB global memcache for testing's sake.
conf.setInt("hbase.regionserver.globalMemcacheLimit", 2 * 1024 * 1024);
// low memcache mark will be 1MB
conf.setInt("hbase.regionserver.globalMemcacheLimitLowMark",
1 * 1024 * 1024);
// make sure we don't do any optional flushes and confuse my tests.
conf.setInt("hbase.regionserver.optionalcacheflushinterval", 120000);
}
/**
* Create a table that we'll use to test.
*/
@Override
public void postHBaseClusterSetup() throws IOException {
HTableDescriptor desc1 = createTableDescriptor("testTable1");
HTableDescriptor desc2 = createTableDescriptor("testTable2");
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc1);
admin.createTable(desc2);
table1 = new HTable(conf, new Text("testTable1"));
table2 = new HTable(conf, new Text("testTable2"));
server = cluster.getRegionServer(0);
// there is a META region in play, and those are probably still in
// the memcache for ROOT. flush it out.
for (HRegion region : server.getOnlineRegions().values()) {
region.flushcache();
}
// make sure we're starting at 0 so that it's easy to predict what the
// results of our tests should be.
assertEquals("Starting memcache size", 0, server.getGlobalMemcacheSize());
}
/**
* Make sure that region server thinks all the memcaches are as big as we were
* hoping they would be.
*/
public void testMemcacheSizeAccounting() throws IOException {
// put some data in each of the two tables
long dataSize = populate(table1, 500, 0) + populate(table2, 500, 0);
// make sure the region server says it is using as much memory as we think
// it is.
assertEquals("Global memcache size", dataSize,
server.getGlobalMemcacheSize());
}
/**
* Test that a put gets blocked and a flush is forced as expected when we
* reach the memcache size limit.
*/
public void testBlocksAndForcesFlush() throws IOException {
// put some data in each of the two tables
long startingDataSize = populate(table1, 500, 0) + populate(table2, 500, 0);
// at this point we have 1052000 bytes in memcache. now, we'll keep adding
// data to one of the tables until just before the global memcache limit,
// noting that the globalMemcacheSize keeps growing as expected. then, we'll
// do another put, causing it to go over the limit. when we look at the
// globablMemcacheSize now, it should be <= the low limit.
long dataNeeded = (2 * 1024 * 1024) - startingDataSize;
double numRows = (double)dataNeeded / (double)rowSize;
int preFlushRows = (int)Math.floor(numRows);
long dataAdded = populate(table1, preFlushRows, 500);
assertEquals("Expected memcache size", dataAdded + startingDataSize,
server.getGlobalMemcacheSize());
populate(table1, 2, preFlushRows + 500);
assertTrue("Post-flush memcache size", server.getGlobalMemcacheSize() <= 1024 * 1024);
}
private long populate(HTable table, int numRows, int startKey) throws IOException {
long total = 0;
BatchUpdate batchUpdate = null;
Text column = new Text(COLFAMILY_NAME1);
for (int i = startKey; i < startKey + numRows; i++) {
Text key = new Text("row_" + String.format("%1$5d", i));
total += key.getLength();
total += column.getLength();
total += 8;
total += ONE_KB.length;
batchUpdate = new BatchUpdate(key);
batchUpdate.put(column, ONE_KB);
table.commit(batchUpdate);
}
return total;
}
}