HADOOP-1644 [hbase] Compactions should not block updates

Disentangles flushes and compactions; flushes can proceed while a
compaction is happening.  Also, don't compact unless we hit
compaction threshold: i.e. don't automatically compact on HRegion
startup so regions can come online the faster.

M src/contrib/hbase/conf/hbase-default.xml
    (hbase.hregion.compactionThreashold): Moved to be a hstore property
    as part of encapsulating compaction decision inside hstore.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    Refactored.  Moved here generalized content loading code that can
    be shared by tests.  Add to setup and teardown the setup and removal
    of local test dir (if it exists).
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java
    Added test of HStoreKey compare (It works other than one would at
    first expect).
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
    Bulk of content loading code has been moved up into the parent class.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
    (tableExists): Restore to a check of if the asked-for table is in list of
    tables.  As it was, a check for tableExists would just wait on all timeouts
    and retries to expire and then report table does not exist..  Fixed up
    debug message listing regions of a table.  Added protection against meta
    table not having a COL_REGINFO (Seen in cluster testing -- probably a bug
    in row removal).
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
    Loading store files, even if it was noticed that there was no corresponding
    map file, was still counting file as valid.  Also fix merger -- was
    constructing MapFile.Reader directly rather than asking HStoreFile for
    the reader (HStoreFile knows how to do MapFile references)
    (rename): Added check that move succeeded and logging.  In cluster-testing,
    the hdfs move of compacted file into place has failed on occasion (Need
    more info).
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    Encapsulate ruling on whether a compaction should take place inside HStore.
    Added reading of the compactionThreshold her.  Compaction threshold is
    currently just number of store files.  Later may include other factors such
    as count of reference files.  Cleaned up debug messages around
    reconstruction log.  Removed compaction if size > 1 from constructor.  Let
    compaction happen after we've been deployed (Compactions that happen while
    we are online can continue to take updates.  Compaction in the constructor
    puts off our being able to take in updates).
    (close): Changed so it now returns set of store files.  This used to be done
    by calls to flush. Since flush and compaction have been disentangled, a
    compaction can come in after flush and the list of files could be off.
    Having it done by close, can be sure list of files is complete.
    (flushCache): No longer returns set of store files.  Added 'merging compaction'
    where we pick an arbitrary store file from disk and merge into it the content
    of memcache (Needs work).
    (getAllMapFiles): Renamed getAllStoreFiles.
    (needsCompaction): Added.
    (compactHelper): Added passing of maximum sequence number if already
    calculated. If compacting one file only, we used skip without rewriting
    the info file.  Fixed.
    Refactored.  Moved guts to new  compact(outFile, listOfStores)  method.
    (compact, CompactionReader): Added overrides and interface  to support
    'merging compaction' that takes files and memcache.  In compaction,
    if we failed the move of the compacted file, all data had already been
    deleted.  Changing, so deletion happens after confirmed move of
    compacted file.
    (getFull): Fixed bug where NPE when read of maps came back null.
    Revealed by our NOT compacting stores on startup.  Meant could be two
    backing stores one of which had no data regards queried key.
    (getNMaps): Renamed countOfStoreFiles.
    (toString): Added.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
    Added comment on 'odd'-looking comparison.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    Javadoc edit. 
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java
    Only return first 128 bytes of value when toStringing (On cluster,
    was returning complete web pages in log).
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    Removed confusing debug message (made sense once -- but not now).
    Test rootRegionLocation for null before using it (can be null).
M  src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    Added comment that delete behavior needs study.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    Fixed merge so it doesn't do the incremental based off files
    returned by flush.  Instead all is done in the one go after
    region closes (using files returned by close).
    Moved duplicated code to new filesByFamily method.
    (WriteState): Removed writesOngoing in favor of compacting and
    flushing flags.
    (flushCache): No longer returns list of files.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java
    Fix javadoc.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@566459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-08-16 01:07:51 +00:00
parent 6c8e713671
commit be33a241ce
19 changed files with 810 additions and 531 deletions

View File

@ -95,3 +95,4 @@ Trunk (unreleased changes)
58. HADOOP-1710 All updates should be batch updates
59. HADOOP-1711 HTable API should use interfaces instead of concrete classes as
method parameters and return values
60. HADOOP-1644 Compactions should not block updates

View File

@ -147,7 +147,7 @@
</description>
</property>
<property>
<name>hbase.hregion.compactionThreshold</name>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value>
<description>
If more than this number of HStoreFiles in any one HStore

View File

@ -38,6 +38,7 @@ public interface HConnection {
public boolean isMasterRunning();
/**
* Checks if <code>tableName</code> exists.
* @param tableName Table to check.
* @return True if table exists already.
*/

View File

@ -209,15 +209,19 @@ public class HConnectionManager implements HConstants {
/** {@inheritDoc} */
public boolean tableExists(final Text tableName) {
boolean exists = true;
try {
SortedMap<Text, HRegionLocation> servers = getTableServers(tableName);
if (servers == null || servers.size() == 0) {
exists = false;
if (tableName == null) {
throw new IllegalArgumentException("Table name cannot be null");
}
boolean exists = false;
try {
HTableDescriptor[] tables = listTables();
for (int i = 0; i < tables.length; i++) {
if (tables[i].getName().equals(tableName)) {
exists = true;
}
}
} catch (IOException e) {
exists = false;
LOG.warn("Testing for table existence threw exception", e);
}
return exists;
}
@ -400,7 +404,6 @@ public class HConnectionManager implements HConstants {
throws IOException {
// Wipe out everything we know about this table
if (this.tablesToServers.remove(tableName) != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Wiping out all we know of " + tableName);
@ -524,9 +527,10 @@ public class HConnectionManager implements HConstants {
}
this.tablesToServers.put(tableName, servers);
if (LOG.isDebugEnabled()) {
int count = 0;
for (Map.Entry<Text, HRegionLocation> e: servers.entrySet()) {
LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() +
" for table " + tableName);
LOG.debug("Region " + (1 + count++) + " of " + servers.size() +
": " + e.getValue());
}
}
return servers;
@ -650,16 +654,12 @@ public class HConnectionManager implements HConstants {
new TreeMap<Text, HRegionLocation>();
for (int tries = 0; servers.size() == 0 && tries < numRetries; tries++) {
long scannerId = -1L;
try {
scannerId =
server.openScanner(t.getRegionInfo().getRegionName(),
scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
while (true) {
HRegionInfo regionInfo = null;
String serverAddress = null;
KeyedData[] values = server.next(scannerId);
if (values.length == 0) {
if (servers.size() == 0) {
@ -671,19 +671,30 @@ public class HConnectionManager implements HConstants {
// We found at least one server for the table and now we're done.
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + servers.size() + " server(s) for " +
"location: " + t + " for tablename " + tableName);
tableName + " at " + t);
}
break;
}
byte[] bytes = null;
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for (int i = 0; i < values.length; i++) {
results.put(values[i].getKey().getColumn(), values[i].getData());
}
regionInfo = new HRegionInfo();
regionInfo = (HRegionInfo) Writables.getWritable(
results.get(COL_REGIONINFO), regionInfo);
byte[] bytes = results.get(COL_REGIONINFO);
if (bytes == null || bytes.length == 0) {
// This can be null. Looks like an info:splitA or info:splitB
// is only item in the row.
if (LOG.isDebugEnabled()) {
LOG.debug(COL_REGIONINFO.toString() + " came back empty: " +
results.toString());
}
servers.clear();
break;
}
HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
results.get(COL_REGIONINFO), new HRegionInfo());
if (!regionInfo.tableDesc.getName().equals(tableName)) {
// We're done
@ -707,7 +718,8 @@ public class HConnectionManager implements HConstants {
servers.clear();
break;
}
serverAddress = Writables.bytesToString(bytes);
String serverAddress = Writables.bytesToString(bytes);
servers.put(regionInfo.startKey, new HRegionLocation(
regionInfo, new HServerAddress(serverAddress)));
}

View File

@ -34,6 +34,7 @@ public class HLogEdit implements Writable {
private Text column = new Text();
private byte [] val;
private long timestamp;
private final int MAX_VALUE_LEN = 128;
/**
* Default constructor used by Writable
@ -69,17 +70,23 @@ public class HLogEdit implements Writable {
return this.timestamp;
}
/** {@inheritDoc} */
/**
* @return First column name, timestamp, and first 128 bytes of the value
* bytes as a String.
*/
@Override
public String toString() {
String value = "";
try {
value = new String(getVal(), HConstants.UTF8_ENCODING);
value = (this.val.length > MAX_VALUE_LEN)?
new String(this.val, 0, MAX_VALUE_LEN, HConstants.UTF8_ENCODING) +
"...":
new String(getVal(), HConstants.UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UTF8 encoding not present?", e);
}
return "(" + getColumn().toString() + "/" + getTimestamp() + "/" + value + ")";
return "(" + getColumn().toString() + "/" + getTimestamp() + "/" +
value + ")";
}
// Writable

View File

@ -312,18 +312,16 @@ HMasterRegionInterface, Runnable {
boolean noReferencesB = splitB == null;
if (!noReferencesA) {
noReferencesA =
hasReferences(metaRegionName, server, info.getRegionName(), splitA, COL_SPLITA);
noReferencesA = hasReferences(metaRegionName, server,
info.getRegionName(), splitA, COL_SPLITA);
}
if (!noReferencesB) {
noReferencesB =
hasReferences(metaRegionName, server, info.getRegionName(), splitB, COL_SPLITB);
noReferencesB = hasReferences(metaRegionName, server,
info.getRegionName(), splitB, COL_SPLITB);
}
if (!(noReferencesA && noReferencesB)) {
if (!noReferencesA && !noReferencesB) {
// No references. Remove this item from table and deleted region on
// disk.
LOG.info("Deleting region " + info.getRegionName() +
" because daughter splits no longer hold references");
@ -337,7 +335,6 @@ HMasterRegionInterface, Runnable {
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
result = true;
}
@ -361,8 +358,8 @@ HMasterRegionInterface, Runnable {
Path [] ps = fs.listPaths(p,
new PathFilter () {
public boolean accept(Path path) {
return HStoreFile.isReference(path);
public boolean accept(Path p) {
return HStoreFile.isReference(p);
}
}
);
@ -394,18 +391,11 @@ HMasterRegionInterface, Runnable {
final String serverName, final long startCode) {
// Skip region - if ...
if(info.offLine // offline
|| killedRegions.contains(info.regionName) // queued for offline
|| regionsToDelete.contains(info.regionName)) { // queued for delete
unassignedRegions.remove(info.regionName);
assignAttempts.remove(info.regionName);
if(LOG.isDebugEnabled()) {
LOG.debug("not assigning region: " + info.regionName + " (offline: " +
info.isOffline() + ", split: " + info.isSplit() + ")");
}
return;
}
@ -416,7 +406,6 @@ HMasterRegionInterface, Runnable {
regionsToKill.containsKey(info.regionName)) {
// Skip if region is on kill list
if(LOG.isDebugEnabled()) {
LOG.debug("not assigning region (on kill list): " + info.regionName);
}
@ -431,14 +420,8 @@ HMasterRegionInterface, Runnable {
&& (storedInfo == null || storedInfo.getStartCode() != startCode)) {
// The current assignment is no good; load the region.
unassignedRegions.put(info.regionName, info);
assignAttempts.put(info.regionName, Long.valueOf(0L));
} else if (LOG.isDebugEnabled()) {
LOG.debug("Finished if " + info.getRegionName() + " is assigned: " +
"unassigned: " + unassignedRegions.containsKey(info.regionName) +
", pending: " + pendingRegions.contains(info.regionName));
}
}
}
@ -2155,8 +2138,10 @@ HMasterRegionInterface, Runnable {
if (rootRegionLocation.get() == null || !rootScanned) {
// We can't proceed until the root region is online and has been scanned
if (LOG.isDebugEnabled()) {
LOG.debug("root region=" + rootRegionLocation.get().toString() +
", rootScanned=" + rootScanned);
LOG.debug("root region: " +
((rootRegionLocation != null)?
rootRegionLocation.toString(): "null") +
", rootScanned: " + rootScanned);
}
return false;
}

View File

@ -243,6 +243,7 @@ public class HMemcache {
*
* TODO - This is kinda slow. We need a data structure that allows for
* proximity-searches, not just precise-matches.
*
* @param map
* @param key
* @param numVersions
@ -251,13 +252,19 @@ public class HMemcache {
ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
final HStoreKey key, final int numVersions) {
ArrayList<byte []> result = new ArrayList<byte []>();
HStoreKey curKey =
new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
// TODO: If get is of a particular version -- numVersions == 1 -- we
// should be able to avoid all of the tailmap creations and iterations
// below.
HStoreKey curKey = new HStoreKey(key);
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(curKey);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
if(HConstants.DELETE_BYTES.compareTo(es.getValue()) == 0) {
// TODO: Shouldn't this be a continue rather than a break? Perhaps
// the intent is that this DELETE_BYTES is meant to suppress older
// info -- see 5.4 Compactions in BigTable -- but how does this jibe
// with being able to remove one version only?
break;
}
result.add(tailMap.get(itKey));

View File

@ -97,10 +97,8 @@ public class HRegion implements HConstants {
throw new IOException("Cannot merge two regions with null start key");
}
// A's start key is null but B's isn't. Assume A comes before B
} else if ((srcB.getStartKey() == null) // A is not null but B is
|| (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
a = srcB;
b = srcA;
}
@ -113,10 +111,8 @@ public class HRegion implements HConstants {
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path rootDir = a.getRootDir();
Text startKey = a.getStartKey();
Text endKey = b.getEndKey();
Path merges = new Path(a.getRegionDir(), MERGEDIR);
if(! fs.exists(merges)) {
fs.mkdirs(merges);
@ -124,95 +120,20 @@ public class HRegion implements HConstants {
HRegionInfo newRegionInfo
= new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey);
Path newRegionDir = HRegion.getRegionDir(merges, newRegionInfo.regionName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " + newRegionDir);
throw new IOException("Cannot merge; target file collision at " +
newRegionDir);
}
LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
b.getRegionName() + " into new region " + newRegionInfo.toString());
// Flush each of the sources, and merge their files into a single
// target for each column family.
TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
TreeMap<Text, Vector<HStoreFile>> filesToMerge =
Map<Text, Vector<HStoreFile>> byFamily =
new TreeMap<Text, Vector<HStoreFile>>();
for(HStoreFile src: a.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
filesToMerge.put(src.getColFamily(), v);
}
v.add(src);
}
for(HStoreFile src: b.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
filesToMerge.put(src.getColFamily(), v);
}
v.add(src);
}
if(LOG.isDebugEnabled()) {
LOG.debug("merging stores");
}
for (Map.Entry<Text, Vector<HStoreFile>> es: filesToMerge.entrySet()) {
Text colFamily = es.getKey();
Vector<HStoreFile> srcFiles = es.getValue();
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
alreadyMerged.addAll(srcFiles);
}
// That should have taken care of the bulk of the data.
// Now close the source HRegions for good, and repeat the above to take care
// of any last-minute inserts
if(LOG.isDebugEnabled()) {
LOG.debug("flushing changes since start of merge for region "
+ a.getRegionName());
}
filesToMerge.clear();
for(HStoreFile src: a.close()) {
if(! alreadyMerged.contains(src)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
filesToMerge.put(src.getColFamily(), v);
}
v.add(src);
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("flushing changes since start of merge for region "
+ b.getRegionName());
}
for(HStoreFile src: b.close()) {
if(! alreadyMerged.contains(src)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
filesToMerge.put(src.getColFamily(), v);
}
v.add(src);
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("merging changes since start of merge");
}
for (Map.Entry<Text, Vector<HStoreFile>> es : filesToMerge.entrySet()) {
byFamily = filesByFamily(byFamily, a.close());
byFamily = filesByFamily(byFamily, b.close());
for (Map.Entry<Text, Vector<HStoreFile>> es : byFamily.entrySet()) {
Text colFamily = es.getKey();
Vector<HStoreFile> srcFiles = es.getValue();
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
@ -234,6 +155,25 @@ public class HRegion implements HConstants {
return dstRegion;
}
/*
* Fills a map with a vector of store files keyed by column family.
* @param byFamily Map to fill.
* @param storeFiles Store files to process.
* @return Returns <code>byFamily</code>
*/
private static Map<Text, Vector<HStoreFile>> filesByFamily(
Map<Text, Vector<HStoreFile>> byFamily, Vector<HStoreFile> storeFiles) {
for(HStoreFile src: storeFiles) {
Vector<HStoreFile> v = byFamily.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
byFamily.put(src.getColFamily(), v);
}
v.add(src);
}
return byFamily;
}
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@ -254,19 +194,19 @@ public class HRegion implements HConstants {
Path regiondir;
static class WriteState {
volatile boolean writesOngoing;
volatile boolean writesEnabled;
WriteState() {
this.writesOngoing = true;
this.writesEnabled = true;
}
// Set while a memcache flush is happening.
volatile boolean flushing = false;
// Set while a compaction is running.
volatile boolean compacting = false;
// Gets set by last flush before close. If set, cannot compact or flush
// again.
volatile boolean writesEnabled = true;
}
volatile WriteState writestate = new WriteState();
final int memcacheFlushSize;
final int blockingMemcacheSize;
int compactionThreshold = 0;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
private final long maxSequenceId;
@ -297,15 +237,12 @@ public class HRegion implements HConstants {
public HRegion(Path rootDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, Path initialFiles)
throws IOException {
this.rootDir = rootDir;
this.log = log;
this.fs = fs;
this.conf = conf;
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.writestate.writesOngoing = true;
this.writestate.writesEnabled = true;
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@ -319,7 +256,6 @@ public class HRegion implements HConstants {
}
// Load in all the HStores.
long maxSeqId = -1;
for(Map.Entry<Text, HColumnDescriptor> e :
this.regionInfo.tableDesc.families().entrySet()) {
@ -357,17 +293,12 @@ public class HRegion implements HConstants {
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
// By default, we compact the region if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold =
conf.getInt("hbase.hregion.compactionThreshold", 3);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
// HRegion is ready to go!
this.writestate.writesOngoing = false;
this.writestate.compacting = false;
LOG.info("region " + this.regionInfo.regionName + " available");
}
@ -411,32 +342,29 @@ public class HRegion implements HConstants {
*
* @param abort true if server is aborting (only during testing)
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of HStoreFile objects.
* HStores make use of. It's a list of HStoreFile objects. Can be null if
* we are not to close at this time or we are already closed.
*
* @throws IOException
*/
Vector<HStoreFile> close(boolean abort) throws IOException {
if (isClosed()) {
LOG.info("region " + this.regionInfo.regionName + " already closed");
return new Vector<HStoreFile>();
return null;
}
lock.obtainWriteLock();
try {
boolean shouldClose = false;
synchronized(writestate) {
while(writestate.writesOngoing) {
while(writestate.compacting || writestate.flushing) {
try {
writestate.wait();
} catch (InterruptedException iex) {
// continue
}
}
writestate.writesOngoing = true;
shouldClose = true;
}
if(!shouldClose) {
return null;
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
}
// Write lock means no more row locks can be given out. Wait on
@ -444,23 +372,18 @@ public class HRegion implements HConstants {
// outstanding updates.
waitOnRowLocks();
Vector<HStoreFile> allHStoreFiles = null;
if (!abort) {
// Don't flush the cache if we are aborting during a test.
allHStoreFiles = internalFlushcache();
internalFlushcache();
}
Vector<HStoreFile> result = new Vector<HStoreFile>();
for (HStore store: stores.values()) {
store.close();
}
try {
return allHStoreFiles;
} finally {
synchronized (writestate) {
writestate.writesOngoing = false;
result.addAll(store.close());
}
this.closed.set(true);
LOG.info("closed " + this.regionInfo.regionName);
}
return result;
} finally {
lock.releaseWriteLock();
}
@ -527,6 +450,7 @@ public class HRegion implements HConstants {
HStoreFile a = new HStoreFile(this.conf, splits,
regionAInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
aReference);
// Reference to top half of the hsf store file.
HStoreFile.Reference bReference = new HStoreFile.Reference(
getRegionName(), h.getFileId(), new HStoreKey(midKey),
HStoreFile.Range.top);
@ -722,11 +646,9 @@ public class HRegion implements HConstants {
this.lock.obtainReadLock();
try {
for (HStore store: stores.values()) {
if(store.getNMaps() > this.compactionThreshold) {
if (store.needsCompaction()) {
needsCompaction = true;
LOG.info(getRegionName().toString() + " needs compaction because " +
store.getNMaps() + " store files present and threshold is " +
this.compactionThreshold);
LOG.info(store.toString() + " needs compaction");
break;
}
}
@ -756,9 +678,9 @@ public class HRegion implements HConstants {
lock.obtainReadLock();
try {
synchronized (writestate) {
if ((!writestate.writesOngoing) &&
if ((!writestate.compacting) &&
writestate.writesEnabled) {
writestate.writesOngoing = true;
writestate.compacting = true;
shouldCompact = true;
}
}
@ -783,7 +705,7 @@ public class HRegion implements HConstants {
} finally {
lock.releaseReadLock();
synchronized (writestate) {
writestate.writesOngoing = false;
writestate.compacting = false;
writestate.notifyAll();
}
}
@ -825,23 +747,17 @@ public class HRegion implements HConstants {
* close() the HRegion shortly, so the HRegion should not take on any new and
* potentially long-lasting disk operations. This flush() should be the final
* pre-close() disk operation.
*
* @return List of store files including new flushes, if any. If no flushes
* because memcache is null, returns all current store files. Returns
* null if no flush (Writes are going on elsewhere -- concurrently we are
* compacting or splitting).
*/
Vector<HStoreFile> flushcache(boolean disableFutureWrites)
void flushcache(boolean disableFutureWrites)
throws IOException {
if (this.closed.get()) {
return null;
return;
}
this.noFlushCount = 0;
boolean shouldFlush = false;
synchronized(writestate) {
if((!writestate.writesOngoing) &&
writestate.writesEnabled) {
writestate.writesOngoing = true;
if((!writestate.flushing) && writestate.writesEnabled) {
writestate.flushing = true;
shouldFlush = true;
if(disableFutureWrites) {
writestate.writesEnabled = false;
@ -854,14 +770,14 @@ public class HRegion implements HConstants {
LOG.debug("NOT flushing memcache for region " +
this.regionInfo.regionName);
}
return null;
return;
}
try {
return internalFlushcache();
internalFlushcache();
} finally {
synchronized (writestate) {
writestate.writesOngoing = false;
writestate.flushing = false;
writestate.notifyAll();
}
}
@ -892,11 +808,8 @@ public class HRegion implements HConstants {
* routes.
*
* <p> This method may block for some time.
*
* @return List of store files including just-made new flushes per-store. If
* not flush, returns list of all store files.
*/
Vector<HStoreFile> internalFlushcache() throws IOException {
void internalFlushcache() throws IOException {
long startTime = -1;
if(LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
@ -917,7 +830,7 @@ public class HRegion implements HConstants {
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
if(retval == null || retval.memcacheSnapshot == null) {
LOG.debug("Finished memcache flush; empty snapshot");
return getAllStoreFiles();
return;
}
long logCacheFlushId = retval.sequenceId;
if(LOG.isDebugEnabled()) {
@ -929,11 +842,8 @@ public class HRegion implements HConstants {
// A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles
= hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
allHStoreFiles.addAll(0, hstoreFiles);
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
}
// B. Write a FLUSHCACHE-COMPLETE message to the log.
@ -958,13 +868,12 @@ public class HRegion implements HConstants {
this.regionInfo.regionName + " in " +
(System.currentTimeMillis() - startTime) + "ms");
}
return allHStoreFiles;
}
private Vector<HStoreFile> getAllStoreFiles() {
Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
Vector<HStoreFile> hstoreFiles = hstore.getAllStoreFiles();
allHStoreFiles.addAll(0, hstoreFiles);
}
return allHStoreFiles;
@ -1020,7 +929,6 @@ public class HRegion implements HConstants {
}
// If unavailable in memcache, check the appropriate HStore
Text colFamily = HStoreKey.extractFamily(key.getColumn());
HStore targetStore = stores.get(colFamily);
if(targetStore == null) {

View File

@ -158,7 +158,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try {
for(HRegion cur: regionsToCheck) {
if(cur.isClosed()) {
continue; // Skip if closed
// Skip if closed
continue;
}
if (cur.needsCompaction()) {
cur.compactStores();
@ -272,10 +273,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
protected final Integer cacheFlusherLock = new Integer(0);
/* Runs periodically to flush memcache.
*
* Memcache flush is also called just before compaction and just before
* split so memcache is best prepared for the the long trip across
* compactions/splits during which it will not be able to flush to disk.
*/
class Flusher implements Runnable {
/**
@ -286,9 +283,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
long startTime = System.currentTimeMillis();
synchronized(cacheFlusherLock) {
// Grab a list of items to flush
Vector<HRegion> toFlush = new Vector<HRegion>();
lock.readLock().lock();
try {
@ -837,6 +832,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
BlockingQueue<ToDoEntry> toDo;
private Worker worker;
private Thread workerThread;
/** Thread that performs long running requests from the master */
class Worker implements Runnable {
void stop() {
@ -910,7 +906,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
HRegion region = onlineRegions.get(regionInfo.regionName);
if(region == null) {
region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
this.lock.writeLock().lock();
try {
this.log.setSequenceNumber(region.getMaxSequenceId());

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.Vector;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.StringUtils;
import org.onelab.filter.BloomFilter;
@ -93,6 +95,8 @@ class HStore implements HConstants {
private long maxSeqId;
private int compactionThreshold;
/**
* An HStore is a set of zero or more MapFiles, which stretch backwards over
* time. A given HStore is responsible for a certain set of columns for a
@ -164,7 +168,7 @@ class HStore implements HConstants {
if(LOG.isDebugEnabled()) {
LOG.debug("starting " + this.storeName +
((reconstructionLog == null)?
((reconstructionLog == null || !fs.exists(reconstructionLog))?
" (no reconstruction log)": " with reconstruction log: " +
reconstructionLog.toString()));
}
@ -215,19 +219,19 @@ class HStore implements HConstants {
}
doReconstructionLog(reconstructionLog, maxSeqId);
this.maxSeqId += 1;
// Compact all the MapFiles into a single file. The resulting MapFile
// should be "timeless"; that is, it should not have an associated seq-ID,
// because all log messages have been reflected in the TreeMaps at this
// point.
//
// TODO: Only do the compaction if we are over a threshold, not
// every time. Not necessary if only two or three store files. Fix after
// revamp of compaction.
if(storefiles.size() > 1) {
compactHelper(true);
}
// By default, we compact if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold =
conf.getInt("hbase.hstore.compactionThreshold", 3);
// We used to compact in here before bringing the store online. Instead
// get it online quick even if it needs compactions so we can start
// taking updates as soon as possible (Once online, can take updates even
// during a compaction).
// Move maxSeqId on by one. Why here? And not in HRegion?
this.maxSeqId += 1;
// Finally, start up all the map readers! (There should be just one at this
// point, as we've compacted them all.)
@ -253,10 +257,6 @@ class HStore implements HConstants {
final long maxSeqID)
throws UnsupportedEncodingException, IOException {
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
if (reconstructionLog != null && !fs.exists(reconstructionLog)) {
LOG.warn("Passed reconstruction log " + reconstructionLog +
" does not exist");
}
// Nothing to do.
return;
}
@ -397,15 +397,18 @@ class HStore implements HConstants {
* Close all the MapFile readers
* @throws IOException
*/
void close() throws IOException {
Vector<HStoreFile> close() throws IOException {
Vector<HStoreFile> result = null;
this.lock.obtainWriteLock();
try {
for (MapFile.Reader reader: this.readers.values()) {
reader.close();
}
this.readers.clear();
result = new Vector<HStoreFile>(storefiles.values());
this.storefiles.clear();
LOG.info("closed " + this.storeName);
return result;
} finally {
this.lock.releaseWriteLock();
}
@ -428,16 +431,15 @@ class HStore implements HConstants {
*
* @param inputCache memcache to flush
* @param logCacheFlushId flush sequence number
* @return Vector of all the HStoreFiles in use
* @throws IOException
*/
Vector<HStoreFile> flushCache(final TreeMap<HStoreKey, byte []> inputCache,
void flushCache(final TreeMap<HStoreKey, byte []> inputCache,
final long logCacheFlushId)
throws IOException {
return flushCacheHelper(inputCache, logCacheFlushId, true);
flushCacheHelper(inputCache, logCacheFlushId, true);
}
Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
void flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
long logCacheFlushId, boolean addToAvailableMaps)
throws IOException {
synchronized(flushLock) {
@ -447,7 +449,25 @@ class HStore implements HConstants {
String name = flushedFile.toString();
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
this.bloomFilter);
// hbase.hstore.compact.on.flush=true enables picking up an existing
// HStoreFIle from disk interlacing the memcache flush compacting as we
// go. The notion is that interlacing would take as long as a pure
// flush with the added benefit of having one less file in the store.
// Experiments show that it takes two to three times the amount of time
// flushing -- more column families makes it so the two timings come
// closer together -- but it also complicates the flush. Disabled for
// now. Needs work picking which file to interlace (favor references
// first, etc.)
//
// Related, looks like 'merging compactions' in BigTable paper interlaces
// a memcache flush. We don't.
try {
if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) &&
this.storefiles.size() > 0) {
compact(out, inputCache.entrySet().iterator(),
this.readers.get(this.storefiles.firstKey()));
} else {
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
if (this.familyName.
@ -455,6 +475,7 @@ class HStore implements HConstants {
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
}
}
}
} finally {
out.close();
}
@ -486,14 +507,14 @@ class HStore implements HConstants {
this.lock.releaseWriteLock();
}
}
return getAllMapFiles();
return;
}
}
/**
* @return - vector of all the HStore files in use
*/
Vector<HStoreFile> getAllMapFiles() {
Vector<HStoreFile> getAllStoreFiles() {
this.lock.obtainReadLock();
try {
return new Vector<HStoreFile>(storefiles.values());
@ -506,6 +527,14 @@ class HStore implements HConstants {
// Compaction
//////////////////////////////////////////////////////////////////////////////
/**
* @return True if this store needs compaction.
*/
public boolean needsCompaction() {
return this.storefiles != null &&
this.storefiles.size() >= this.compactionThreshold;
}
/**
* Compact the back-HStores. This method may take some time, so the calling
* thread must be able to block for long periods.
@ -528,11 +557,24 @@ class HStore implements HConstants {
compactHelper(false);
}
void compactHelper(boolean deleteSequenceInfo) throws IOException {
void compactHelper(final boolean deleteSequenceInfo)
throws IOException {
compactHelper(deleteSequenceInfo, -1);
}
/*
* @param deleteSequenceInfo True if we are to set the sequence number to -1
* on compacted file.
* @param maxSeenSeqID We may have already calculated the maxSeenSeqID. If
* so, pass it here. Otherwise, pass -1 and it will be calculated inside in
* this method.
* @throws IOException
*/
void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
throws IOException {
synchronized(compactLock) {
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
fs.mkdirs(curCompactStore);
if(LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + storefiles.size() + " files in " +
curCompactStore.toString());
@ -547,9 +589,25 @@ class HStore implements HConstants {
this.lock.releaseWriteLock();
}
HStoreFile compactedOutputFile =
new HStoreFile(conf, compactdir, regionName, familyName, -1);
if (toCompactFiles.size() < 1 ||
(toCompactFiles.size() == 1 &&
!toCompactFiles.get(0).isReference())) {
if (LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
}
if (deleteSequenceInfo && toCompactFiles.size() == 1) {
toCompactFiles.get(0).writeInfo(fs, -1);
}
return;
}
fs.mkdirs(curCompactStore);
// Compute the max-sequenceID seen in any of the to-be-compacted
// TreeMaps
long maxSeenSeqID = -1;
// TreeMaps if it hasn't been passed in to us.
if (maxSeenSeqID == -1) {
for (HStoreFile hsf: toCompactFiles) {
long seqid = hsf.loadInfo(fs);
if(seqid > 0) {
@ -558,18 +616,6 @@ class HStore implements HConstants {
}
}
}
HStoreFile compactedOutputFile
= new HStoreFile(conf, compactdir, regionName, familyName, -1);
if(toCompactFiles.size() == 1) {
// TODO: Only rewrite if NOT a HSF reference file.
if(LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
}
HStoreFile hsf = toCompactFiles.elementAt(0);
if(hsf.loadInfo(fs) == -1) {
return;
}
}
// Step through them, writing to the brand-new TreeMap
@ -577,29 +623,169 @@ class HStore implements HConstants {
compactedOutputFile.getWriter(this.fs, this.compression,
this.bloomFilter);
try {
// We create a new set of MapFile.Reader objects so we don't screw up
// the caching associated with the currently-loaded ones.
//
// Our iteration-based access pattern is practically designed to ruin
// the cache.
//
// We work by opening a single MapFile.Reader for each file, and
// iterating through them in parallel. We always increment the
// lowest-ranked one. Updates to a single row/column will appear
// ranked by timestamp. This allows us to throw out deleted values or
// obsolete versions.
MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()];
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
ImmutableBytesWritable[] vals =
new ImmutableBytesWritable[toCompactFiles.size()];
boolean[] done = new boolean[toCompactFiles.size()];
int pos = 0;
compact(compactedOut, toCompactFiles);
} finally {
compactedOut.close();
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
compactedOutputFile.writeInfo(fs, maxSeenSeqID);
} else {
compactedOutputFile.writeInfo(fs, -1);
}
// Write out a list of data files that we're replacing
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
try {
out.writeInt(toCompactFiles.size());
for(HStoreFile hsf: toCompactFiles) {
rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter);
keys[pos] = new HStoreKey();
vals[pos] = new ImmutableBytesWritable();
done[pos] = false;
pos++;
hsf.write(out);
}
} finally {
out.close();
}
// Indicate that we're done.
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
(new DataOutputStream(fs.create(doneFile))).close();
// Move the compaction into place.
processReadyCompaction();
} finally {
if (fs.exists(compactdir)) {
fs.delete(compactdir);
}
}
}
}
/*
* Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
* We create a new set of MapFile.Reader objects so we don't screw up
* the caching associated with the currently-loaded ones. Our
* iteration-based access pattern is practically designed to ruin
* the cache.
*
* We work by opening a single MapFile.Reader for each file, and
* iterating through them in parallel. We always increment the
* lowest-ranked one. Updates to a single row/column will appear
* ranked by timestamp. This allows us to throw out deleted values or
* obsolete versions.
* @param compactedOut
* @param toCompactFiles
* @throws IOException
*/
void compact(final MapFile.Writer compactedOut,
final Vector<HStoreFile> toCompactFiles)
throws IOException {
int size = toCompactFiles.size();
CompactionReader[] rdrs = new CompactionReader[size];
int index = 0;
for (HStoreFile hsf: toCompactFiles) {
try {
rdrs[index++] =
new MapFileCompactionReader(hsf.getReader(fs, bloomFilter));
} catch (IOException e) {
// Add info about which file threw exception. It may not be in the
// exception message so output a message here where we know the
// culprit.
LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
(hsf.isReference()? " " + hsf.getReference().toString(): ""));
throw e;
}
}
try {
compact(compactedOut, rdrs);
} finally {
for (int i = 0; i < rdrs.length; i++) {
if (rdrs[i] != null) {
try {
rdrs[i].close();
} catch (IOException e) {
LOG.warn("Exception closing reader", e);
}
}
}
}
}
interface CompactionReader {
public void close() throws IOException;
public boolean next(WritableComparable key, Writable val)
throws IOException;
public void reset() throws IOException;
}
class MapFileCompactionReader implements CompactionReader {
final MapFile.Reader reader;
MapFileCompactionReader(final MapFile.Reader r) {
this.reader = r;
}
public void close() throws IOException {
this.reader.close();
}
public boolean next(WritableComparable key, Writable val)
throws IOException {
return this.reader.next(key, val);
}
public void reset() throws IOException {
this.reader.reset();
}
}
void compact(final MapFile.Writer compactedOut,
final Iterator<Entry<HStoreKey, byte []>> iterator,
final MapFile.Reader reader)
throws IOException {
// Make an instance of a CompactionReader that wraps the iterator.
CompactionReader cr = new CompactionReader() {
public boolean next(WritableComparable key, Writable val)
throws IOException {
boolean result = false;
while (iterator.hasNext()) {
Entry<HStoreKey, byte []> e = iterator.next();
HStoreKey hsk = e.getKey();
if (familyName.equals(HStoreKey.extractFamily(hsk.getColumn()))) {
((HStoreKey)key).set(hsk);
((ImmutableBytesWritable)val).set(e.getValue());
result = true;
break;
}
}
return result;
}
@SuppressWarnings("unused")
public void reset() throws IOException {
// noop.
}
@SuppressWarnings("unused")
public void close() throws IOException {
// noop.
}
};
compact(compactedOut,
new CompactionReader [] {cr, new MapFileCompactionReader(reader)});
}
void compact(final MapFile.Writer compactedOut,
final CompactionReader [] rdrs)
throws IOException {
HStoreKey[] keys = new HStoreKey[rdrs.length];
ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
boolean[] done = new boolean[rdrs.length];
for(int i = 0; i < rdrs.length; i++) {
keys[i] = new HStoreKey();
vals[i] = new ImmutableBytesWritable();
done[i] = false;
}
// Now, advance through the readers in order. This will have the
@ -623,7 +809,6 @@ class HStore implements HConstants {
if(done[i]) {
continue;
}
if(smallestKey < 0) {
smallestKey = i;
} else {
@ -668,47 +853,10 @@ class HStore implements HConstants {
vals[smallestKey])) {
done[smallestKey] = true;
rdrs[smallestKey].close();
rdrs[smallestKey] = null;
numDone++;
}
}
} finally {
compactedOut.close();
}
if(LOG.isDebugEnabled()) {
LOG.debug("writing new compacted HStore " + compactedOutputFile);
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
compactedOutputFile.writeInfo(fs, maxSeenSeqID);
} else {
compactedOutputFile.writeInfo(fs, -1);
}
// Write out a list of data files that we're replacing
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
try {
out.writeInt(toCompactFiles.size());
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
hsf.write(out);
}
} finally {
out.close();
}
// Indicate that we're done.
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
(new DataOutputStream(fs.create(doneFile))).close();
// Move the compaction into place.
processReadyCompaction();
} finally {
fs.delete(compactdir);
}
}
}
/**
@ -773,21 +921,19 @@ class HStore implements HConstants {
}
}
Vector<HStoreFile> toDelete = new Vector<HStoreFile>(keys.size());
for (Long key: keys) {
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
HStoreFile hsf = this.storefiles.remove(key);
// 4. Delete all old files, no longer needed
hsf.delete();
}
if(LOG.isDebugEnabled()) {
LOG.debug("deleted " + toCompactFiles.size() + " old file(s)");
// 4. Add to the toDelete files all old files, no longer needed
toDelete.add(hsf);
}
// What if we fail now? The above deletes will fail silently. We'd better
// make sure not to write out any new files with the same names as
// What if we fail now? The above deletes will fail silently. We'd
// better make sure not to write out any new files with the same names as
// something we delete, though.
// 5. Moving the new MapFile into place
@ -800,9 +946,23 @@ class HStore implements HConstants {
compactdir.toString() +
" to " + finalCompactedFile.toString() + " in " + dir.toString());
}
compactedFile.rename(this.fs, finalCompactedFile);
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
LOG.error("Failed move of compacted file " +
finalCompactedFile.toString());
return;
}
// Safe to delete now compaction has been moved into place.
for (HStoreFile hsf: toDelete) {
if (hsf.getFileId() == finalCompactedFile.getFileId()) {
// Be careful we do not delte the just compacted file.
LOG.warn("Weird. File to delete has same name as one we are " +
"about to delete (skipping): " + hsf.getFileId());
continue;
}
hsf.delete();
}
// Fail here? No worries.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
// 6. Loading the new TreeMap.
@ -810,7 +970,6 @@ class HStore implements HConstants {
finalCompactedFile.getReader(this.fs, this.bloomFilter));
this.storefiles.put(orderVal, finalCompactedFile);
} finally {
// 7. Releasing the write-lock
this.lock.releaseWriteLock();
}
@ -838,6 +997,9 @@ class HStore implements HConstants {
map.reset();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
if (readkey == null) {
continue;
}
do {
Text readcol = readkey.getColumn();
if (results.get(readcol) == null
@ -1004,7 +1166,7 @@ class HStore implements HConstants {
/**
* @return Returns the number of map files currently in use
*/
int getNMaps() {
int countOfStoreFiles() {
this.lock.obtainReadLock();
try {
return storefiles.size();
@ -1014,6 +1176,22 @@ class HStore implements HConstants {
}
}
boolean hasReferences() {
boolean result = false;
this.lock.obtainReadLock();
try {
for (HStoreFile hsf: this.storefiles.values()) {
if (hsf.isReference()) {
break;
}
}
} finally {
this.lock.releaseReadLock();
}
return result;
}
//////////////////////////////////////////////////////////////////////////////
// File administration
//////////////////////////////////////////////////////////////////////////////
@ -1039,6 +1217,11 @@ class HStore implements HConstants {
return new HStoreScanner(timestamp, targetCols, firstRow);
}
@Override
public String toString() {
return this.storeName;
}
//////////////////////////////////////////////////////////////////////////////
// This class implements the HScannerInterface.
// It lets the caller scan the contents of this HStore.

View File

@ -399,7 +399,13 @@ public class HStoreFile implements HConstants, WritableComparable {
Path mapfile = curfile.getMapFilePath();
if (!fs.exists(mapfile)) {
fs.delete(curfile.getInfoFilePath());
LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " +
"Cleaned up info file. Continuing...");
continue;
}
// TODO: Confirm referent exists.
// Found map and sympathetic info file. Add this hstorefile to result.
results.add(curfile);
// Keep list of sympathetic data mapfiles for cleaning info dir in next
@ -537,8 +543,7 @@ public class HStoreFile implements HConstants, WritableComparable {
try {
for(HStoreFile src: srcFiles) {
MapFile.Reader in =
new MapFile.Reader(fs, src.getMapFilePath().toString(), conf);
MapFile.Reader in = src.getReader(fs, null);
try {
HStoreKey readkey = new HStoreKey();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
@ -627,12 +632,23 @@ public class HStoreFile implements HConstants, WritableComparable {
* <code>hsf</code> directory.
* @param fs
* @param hsf
* @return True if succeeded.
* @throws IOException
*/
public void rename(final FileSystem fs, final HStoreFile hsf)
public boolean rename(final FileSystem fs, final HStoreFile hsf)
throws IOException {
fs.rename(getMapFilePath(), hsf.getMapFilePath());
fs.rename(getInfoFilePath(), hsf.getInfoFilePath());
boolean success = fs.rename(getMapFilePath(), hsf.getMapFilePath());
if (!success) {
LOG.warn("Failed rename of " + getMapFilePath() + " to " +
hsf.getMapFilePath());
return success;
}
success = fs.rename(getInfoFilePath(), hsf.getInfoFilePath());
if (!success) {
LOG.warn("Failed rename of " + getInfoFilePath() + " to " +
hsf.getInfoFilePath());
}
return success;
}
/**

View File

@ -301,6 +301,10 @@ public class HStoreKey implements WritableComparable {
if(result == 0) {
result = this.column.compareTo(other.column);
if(result == 0) {
// The below older timestamps sorting ahead of newer timestamps looks
// wrong but it is intentional. This way, newer timestamps are first
// found when we iterate over a memcache and newer versions are the
// first we trip over when reading from a store file.
if(this.timestamp < other.timestamp) {
result = 1;
} else if(this.timestamp > other.timestamp) {

View File

@ -87,6 +87,13 @@ public class ImmutableBytesWritable implements WritableComparable {
return this.bytes;
}
/**
* @param b Use passed bytes as backing array for this instance.
*/
public void set(final byte [] b) {
this.bytes = b;
}
/**
* @return the current size of the buffer.
*/

View File

@ -66,15 +66,16 @@ public class Writables {
* @param w An empty Writable (usually made by calling the null-arg
* constructor).
* @return The passed Writable after its readFields has been called fed
* by the passed <code>bytes</code> array or null if passed null or
* empty <code>bytes</code>.
* by the passed <code>bytes</code> array or IllegalArgumentException
* if passed null or an empty <code>bytes</code> array.
* @throws IOException
* @throws IllegalArgumentException
*/
public static Writable getWritable(final byte [] bytes, final Writable w)
throws IOException {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException(
"Con't build a writable with empty bytes array");
throw new IllegalArgumentException("Can't build a writable with empty " +
"bytes array");
}
if (w == null) {
throw new IllegalArgumentException("Writable cannot be null");

View File

@ -32,6 +32,14 @@ import org.apache.hadoop.io.Text;
* Abstract base class for test cases. Performs all static initialization
*/
public abstract class HBaseTestCase extends TestCase {
public final static String COLFAMILY_NAME1 = "colfamily1:";
public final static String COLFAMILY_NAME2 = "colfamily2:";
public final static String COLFAMILY_NAME3 = "colfamily3:";
protected Path testDir = null;
protected FileSystem localFs = null;
public static final char FIRST_CHAR = 'a';
public static final char LAST_CHAR = 'z';
static {
StaticTestEnvironment.initialize();
}
@ -48,6 +56,29 @@ public abstract class HBaseTestCase extends TestCase {
conf = new HBaseConfiguration();
}
@Override
protected void setUp() throws Exception {
super.setUp();
this.testDir = getUnitTestdir(getName());
this.localFs = FileSystem.getLocal(this.conf);
if (localFs.exists(testDir)) {
localFs.delete(testDir);
}
}
@Override
protected void tearDown() throws Exception {
try {
if (this.localFs != null && this.testDir != null &&
this.localFs.exists(testDir)) {
this.localFs.delete(testDir);
}
} catch (Exception e) {
e.printStackTrace();
}
super.tearDown();
}
protected Path getUnitTestdir(String testName) {
return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
}
@ -63,4 +94,112 @@ public abstract class HBaseTestCase extends TestCase {
new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
fs, conf, info, null);
}
protected HTableDescriptor createTableDescriptor(final String name) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
return htd;
}
protected void addContent(final HRegion r, final String column)
throws IOException {
Text startKey = r.getRegionInfo().getStartKey();
Text endKey = r.getRegionInfo().getEndKey();
byte [] startKeyBytes = startKey.getBytes();
if (startKeyBytes == null || startKeyBytes.length == 0) {
startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
}
addContent(new HRegionLoader(r), column, startKeyBytes, endKey);
}
protected void addContent(final Loader updater, final String column)
throws IOException {
addContent(updater, column,
new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null);
}
protected void addContent(final Loader updater, final String column,
final byte [] startKeyBytes, final Text endKey)
throws IOException {
// Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the
// second character over same range. And same for the third so rows
// (and values) look like this: 'aaa', 'aab', 'aac', etc.
char secondCharStart = (char)startKeyBytes[1];
char thirdCharStart = (char)startKeyBytes[2];
EXIT: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) {
for (char d = secondCharStart; d <= LAST_CHAR; d++) {
for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
Text t = new Text(new String(bytes));
if (endKey != null && endKey.getLength() > 0
&& endKey.compareTo(t) <= 0) {
break EXIT;
}
long lockid = updater.startBatchUpdate(t);
try {
updater.put(lockid, new Text(column), bytes);
updater.commit(lockid);
lockid = -1;
} finally {
if (lockid != -1) {
updater.abort(lockid);
}
}
}
// Set start character back to FIRST_CHAR after we've done first loop.
thirdCharStart = FIRST_CHAR;
}
secondCharStart = FIRST_CHAR;
}
}
public interface Loader {
public long startBatchUpdate(final Text row) throws IOException;
public void put(long lockid, Text column, byte val[]) throws IOException;
public void commit(long lockid) throws IOException;
public void abort(long lockid) throws IOException;
}
public class HRegionLoader implements Loader {
final HRegion region;
public HRegionLoader(final HRegion HRegion) {
super();
this.region = HRegion;
}
public void abort(long lockid) throws IOException {
this.region.abort(lockid);
}
public void commit(long lockid) throws IOException {
this.region.commit(lockid, System.currentTimeMillis());
}
public void put(long lockid, Text column, byte[] val) throws IOException {
this.region.put(lockid, column, val);
}
public long startBatchUpdate(Text row) throws IOException {
return this.region.startUpdate(row);
}
}
public class HTableLoader implements Loader {
final HTable table;
public HTableLoader(final HTable table) {
super();
this.table = table;
}
public void abort(long lockid) throws IOException {
this.table.abort(lockid);
}
public void commit(long lockid) throws IOException {
this.table.commit(lockid);
}
public void put(long lockid, Text column, byte[] val) throws IOException {
this.table.put(lockid, column, val);
}
public long startBatchUpdate(Text row) {
return this.table.startBatchUpdate(row);
}
}
}

View File

@ -365,7 +365,9 @@ public class MiniHBaseCluster implements HConstants {
shutdown(this.masterThread, this.regionThreads);
// Close the file system. Will complain if files open so helps w/ leaks.
try {
if (this.cluster != null && this.cluster.getFileSystem() != null) {
this.cluster.getFileSystem().close();
}
} catch (IOException e) {
LOG.error("Closing down dfs", e);
}

View File

@ -0,0 +1,101 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Test compactions
*/
public class TestCompaction extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
protected void setUp() throws Exception {
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
}
/**
* Run compaction and flushing memcache
* @throws Exception
*/
public void testCompaction() throws Exception {
HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
final HRegion r =
new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
try {
createStoreFile(r);
assertFalse(r.needsCompaction());
int compactionThreshold =
this.conf.getInt("hbase.hstore.compactionThreshold", 3);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
assertTrue(r.needsCompaction());
// Try to run compaction concurrent with a thread flush.
addContent(new HRegionLoader(r), COLFAMILY_NAME1);
Thread t1 = new Thread() {
@Override
public void run() {
try {
r.flushcache(false);
} catch (IOException e) {
e.printStackTrace();
}
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
assertTrue(r.compactStores());
} catch (IOException e) {
e.printStackTrace();
}
}
};
t1.setDaemon(true);
t1.start();
t2.setDaemon(true);
t2.start();
t1.join();
t2.join();
} finally {
r.close();
hlog.closeAndDelete();
}
}
private void createStoreFile(final HRegion r) throws IOException {
HRegionLoader loader = new HRegionLoader(r);
for (int i = 0; i < 3; i++) {
addContent(loader, COLFAMILY_NAME1);
}
r.flushcache(false);
}
}

View File

@ -26,7 +26,27 @@ import junit.framework.TestCase;
* Test comparing HBase objects.
*/
public class TestCompare extends TestCase {
/** test case */
/**
* HStoreKey sorts as you would expect in the row and column portions but
* for the timestamps, it sorts in reverse with the newest sorting before
* the oldest (This is intentional so we trip over the latest first when
* iterating or looking in store files).
*/
public void testHStoreKey() {
long timestamp = System.currentTimeMillis();
Text a = new Text("a");
HStoreKey past = new HStoreKey(a, a, timestamp - 10);
HStoreKey now = new HStoreKey(a, a, timestamp);
HStoreKey future = new HStoreKey(a, a, timestamp + 10);
assertTrue(past.compareTo(now) > 0);
assertTrue(now.compareTo(now) == 0);
assertTrue(future.compareTo(now) < 0);
}
/**
* Sort of HRegionInfo.
*/
public void testHRegionInfo() {
HRegionInfo a = new HRegionInfo(1, new HTableDescriptor("a"), null, null);
HRegionInfo b = new HRegionInfo(2, new HTableDescriptor("b"), null, null);

View File

@ -38,61 +38,35 @@ import org.apache.log4j.Logger;
* split and manufactures odd-ball split scenarios.
*/
public class TestSplit extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestSplit.class);
private final static String COLFAMILY_NAME1 = "colfamily1:";
private final static String COLFAMILY_NAME2 = "colfamily2:";
private final static String COLFAMILY_NAME3 = "colfamily3:";
private Path testDir = null;
private FileSystem fs = null;
private static final char FIRST_CHAR = 'a';
private static final char LAST_CHAR = 'z';
static final Log LOG = LogFactory.getLog(TestSplit.class.getName());
/** constructor */
public TestSplit() {
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
Logger.getLogger(this.getClass().getPackage().getName()).
setLevel(Level.DEBUG);
}
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
super.setUp();
this.testDir = getUnitTestdir(getName());
this.fs = FileSystem.getLocal(this.conf);
if (fs.exists(testDir)) {
fs.delete(testDir);
}
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 1024 * 128);
}
/** {@inheritDoc} */
@Override
public void tearDown() throws Exception {
if (fs != null) {
try {
if (this.fs.exists(testDir)) {
this.fs.delete(testDir);
}
} catch (Exception e) {
e.printStackTrace();
}
}
super.tearDown();
}
/**
* Splits twice and verifies getting from each of the split regions.
* @throws Exception
*/
public void testBasicSplit() throws Exception {
HRegion region = null;
HLog hlog = new HLog(this.fs, this.testDir, this.conf);
HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
try {
HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
region = new HRegion(testDir, hlog, fs, this.conf, hri, null);
region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
basicSplit(region);
} finally {
if (region != null) {
@ -102,14 +76,6 @@ public class TestSplit extends HBaseTestCase {
}
}
private HTableDescriptor createTableDescriptor(final String name) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
return htd;
}
private void basicSplit(final HRegion region) throws Exception {
addContent(region, COLFAMILY_NAME3);
region.internalFlushcache();
@ -185,12 +151,12 @@ public class TestSplit extends HBaseTestCase {
*/
public void testSplitRegionIsDeleted() throws Exception {
final int retries = 10;
this.testDir = null;
this.fs = null;
// Start up a hbase cluster
MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1);
Path testDir = cluster.regionThreads.get(0).getRegionServer().rootDir;
FileSystem fs = cluster.getDFSCluster().getFileSystem();
MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1, true);
Path d = cluster.regionThreads.get(0).getRegionServer().rootDir;
FileSystem fs = (cluster.getDFSCluster() == null)?
this.localFs:
cluster.getDFSCluster().getFileSystem();
HTable meta = null;
HTable t = null;
try {
@ -201,7 +167,7 @@ public class TestSplit extends HBaseTestCase {
meta = new HTable(this.conf, HConstants.META_TABLE_NAME);
int count = count(meta, HConstants.COLUMN_FAMILY_STR);
t = new HTable(this.conf, new Text(getName()));
addContent(t, COLFAMILY_NAME3);
addContent(new HTableLoader(t), COLFAMILY_NAME3);
// All is running in the one JVM so I should be able to get the
// region instance and bring on a split.
HRegionInfo hri =
@ -223,8 +189,7 @@ public class TestSplit extends HBaseTestCase {
}
HRegionInfo parent = getSplitParent(meta);
assertTrue(parent.isOffline());
Path parentDir =
HRegion.getRegionDir(testDir, parent.getRegionName());
Path parentDir = HRegion.getRegionDir(d, parent.getRegionName());
assertTrue(fs.exists(parentDir));
LOG.info("Split happened and parent " + parent.getRegionName() + " is " +
"offline");
@ -263,7 +228,7 @@ public class TestSplit extends HBaseTestCase {
for (int i = 0; i < 10; i++) {
try {
for (HRegion online: regions.values()) {
if (online.getRegionName().toString().startsWith(getName())) {
if (online.getTableDesc().getName().toString().equals(getName())) {
online.compactStores();
}
}
@ -403,79 +368,4 @@ public class TestSplit extends HBaseTestCase {
assertEquals(regions.length, 2);
return regions;
}
private void addContent(final HRegion r, final String column)
throws IOException {
Text startKey = r.getRegionInfo().getStartKey();
Text endKey = r.getRegionInfo().getEndKey();
byte [] startKeyBytes = startKey.getBytes();
if (startKeyBytes == null || startKeyBytes.length == 0) {
startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
}
// Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the
// second character over same range. And same for the third so rows
// (and values) look like this: 'aaa', 'aab', 'aac', etc.
char secondCharStart = (char)startKeyBytes[1];
char thirdCharStart = (char)startKeyBytes[2];
EXIT_ALL_LOOPS: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) {
for (char d = secondCharStart; d <= LAST_CHAR; d++) {
for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
Text t = new Text(new String(bytes));
if (endKey != null && endKey.getLength() > 0
&& endKey.compareTo(t) <= 0) {
break EXIT_ALL_LOOPS;
}
long lockid = r.startUpdate(t);
try {
r.put(lockid, new Text(column), bytes);
r.commit(lockid, System.currentTimeMillis());
lockid = -1;
} finally {
if (lockid != -1) {
r.abort(lockid);
}
}
}
// Set start character back to FIRST_CHAR after we've done first loop.
thirdCharStart = FIRST_CHAR;
}
secondCharStart = FIRST_CHAR;
}
}
// TODO: Have HTable and HRegion implement interface that has in it
// startUpdate, put, delete, commit, abort, etc.
private void addContent(final HTable table, final String column)
throws IOException {
byte [] startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
// Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the
// second character over same range. And same for the third so rows
// (and values) look like this: 'aaa', 'aab', 'aac', etc.
char secondCharStart = (char)startKeyBytes[1];
char thirdCharStart = (char)startKeyBytes[2];
for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) {
for (char d = secondCharStart; d <= LAST_CHAR; d++) {
for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
Text t = new Text(new String(bytes));
long lockid = table.startUpdate(t);
try {
table.put(lockid, new Text(column), bytes);
table.commit(lockid, System.currentTimeMillis());
lockid = -1;
} finally {
if (lockid != -1) {
table.abort(lockid);
}
}
}
// Set start character back to FIRST_CHAR after we've done first loop.
thirdCharStart = FIRST_CHAR;
}
secondCharStart = FIRST_CHAR;
}
}
}