From be33a241ce1a2926fb72a961b8d02213057d14a8 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 16 Aug 2007 01:07:51 +0000 Subject: [PATCH] HADOOP-1644 [hbase] Compactions should not block updates Disentangles flushes and compactions; flushes can proceed while a compaction is happening. Also, don't compact unless we hit compaction threshold: i.e. don't automatically compact on HRegion startup so regions can come online the faster. M src/contrib/hbase/conf/hbase-default.xml (hbase.hregion.compactionThreashold): Moved to be a hstore property as part of encapsulating compaction decision inside hstore. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Refactored. Moved here generalized content loading code that can be shared by tests. Add to setup and teardown the setup and removal of local test dir (if it exists). M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java Added test of HStoreKey compare (It works other than one would at first expect). M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Bulk of content loading code has been moved up into the parent class. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (tableExists): Restore to a check of if the asked-for table is in list of tables. As it was, a check for tableExists would just wait on all timeouts and retries to expire and then report table does not exist.. Fixed up debug message listing regions of a table. Added protection against meta table not having a COL_REGINFO (Seen in cluster testing -- probably a bug in row removal). M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java Loading store files, even if it was noticed that there was no corresponding map file, was still counting file as valid. Also fix merger -- was constructing MapFile.Reader directly rather than asking HStoreFile for the reader (HStoreFile knows how to do MapFile references) (rename): Added check that move succeeded and logging. In cluster-testing, the hdfs move of compacted file into place has failed on occasion (Need more info). M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Encapsulate ruling on whether a compaction should take place inside HStore. Added reading of the compactionThreshold her. Compaction threshold is currently just number of store files. Later may include other factors such as count of reference files. Cleaned up debug messages around reconstruction log. Removed compaction if size > 1 from constructor. Let compaction happen after we've been deployed (Compactions that happen while we are online can continue to take updates. Compaction in the constructor puts off our being able to take in updates). (close): Changed so it now returns set of store files. This used to be done by calls to flush. Since flush and compaction have been disentangled, a compaction can come in after flush and the list of files could be off. Having it done by close, can be sure list of files is complete. (flushCache): No longer returns set of store files. Added 'merging compaction' where we pick an arbitrary store file from disk and merge into it the content of memcache (Needs work). (getAllMapFiles): Renamed getAllStoreFiles. (needsCompaction): Added. (compactHelper): Added passing of maximum sequence number if already calculated. If compacting one file only, we used skip without rewriting the info file. Fixed. Refactored. Moved guts to new compact(outFile, listOfStores) method. (compact, CompactionReader): Added overrides and interface to support 'merging compaction' that takes files and memcache. In compaction, if we failed the move of the compacted file, all data had already been deleted. Changing, so deletion happens after confirmed move of compacted file. (getFull): Fixed bug where NPE when read of maps came back null. Revealed by our NOT compacting stores on startup. Meant could be two backing stores one of which had no data regards queried key. (getNMaps): Renamed countOfStoreFiles. (toString): Added. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Added comment on 'odd'-looking comparison. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Javadoc edit. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java Only return first 128 bytes of value when toStringing (On cluster, was returning complete web pages in log). M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Removed confusing debug message (made sense once -- but not now). Test rootRegionLocation for null before using it (can be null). M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Added comment that delete behavior needs study. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Fixed merge so it doesn't do the incremental based off files returned by flush. Instead all is done in the one go after region closes (using files returned by close). Moved duplicated code to new filesByFamily method. (WriteState): Removed writesOngoing in favor of compacting and flushing flags. (flushCache): No longer returns list of files. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java Fix javadoc. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@566459 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + conf/hbase-default.xml | 2 +- .../org/apache/hadoop/hbase/HConnection.java | 1 + .../hadoop/hbase/HConnectionManager.java | 58 +- .../org/apache/hadoop/hbase/HLogEdit.java | 17 +- src/java/org/apache/hadoop/hbase/HMaster.java | 43 +- .../org/apache/hadoop/hbase/HMemcache.java | 11 +- src/java/org/apache/hadoop/hbase/HRegion.java | 232 +++----- .../apache/hadoop/hbase/HRegionServer.java | 13 +- src/java/org/apache/hadoop/hbase/HStore.java | 509 ++++++++++++------ .../org/apache/hadoop/hbase/HStoreFile.java | 26 +- .../org/apache/hadoop/hbase/HStoreKey.java | 4 + .../hbase/io/ImmutableBytesWritable.java | 7 + .../apache/hadoop/hbase/util/Writables.java | 9 +- .../apache/hadoop/hbase/HBaseTestCase.java | 139 +++++ .../apache/hadoop/hbase/MiniHBaseCluster.java | 4 +- .../apache/hadoop/hbase/TestCompaction.java | 101 ++++ .../org/apache/hadoop/hbase/TestCompare.java | 22 +- .../org/apache/hadoop/hbase/TestSplit.java | 142 +---- 19 files changed, 810 insertions(+), 531 deletions(-) create mode 100644 src/test/org/apache/hadoop/hbase/TestCompaction.java diff --git a/CHANGES.txt b/CHANGES.txt index 792189871e8..97ec53b7fce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -95,3 +95,4 @@ Trunk (unreleased changes) 58. HADOOP-1710 All updates should be batch updates 59. HADOOP-1711 HTable API should use interfaces instead of concrete classes as method parameters and return values + 60. HADOOP-1644 Compactions should not block updates diff --git a/conf/hbase-default.xml b/conf/hbase-default.xml index c9119a1c071..9a4316c22f6 100644 --- a/conf/hbase-default.xml +++ b/conf/hbase-default.xml @@ -147,7 +147,7 @@ - hbase.hregion.compactionThreshold + hbase.hstore.compactionThreshold 3 If more than this number of HStoreFiles in any one HStore diff --git a/src/java/org/apache/hadoop/hbase/HConnection.java b/src/java/org/apache/hadoop/hbase/HConnection.java index 714c2f1f2d3..442b904f8c8 100644 --- a/src/java/org/apache/hadoop/hbase/HConnection.java +++ b/src/java/org/apache/hadoop/hbase/HConnection.java @@ -38,6 +38,7 @@ public interface HConnection { public boolean isMasterRunning(); /** + * Checks if tableName exists. * @param tableName Table to check. * @return True if table exists already. */ diff --git a/src/java/org/apache/hadoop/hbase/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/HConnectionManager.java index 9f3b4306dbb..56215760df6 100644 --- a/src/java/org/apache/hadoop/hbase/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/HConnectionManager.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Writables; * multiple HBase instances */ public class HConnectionManager implements HConstants { - private HConnectionManager(){} // Not instantiable + private HConnectionManager() {} // Not instantiable // A Map of master HServerAddress -> connection information for that instance // Note that although the Map is synchronized, the objects it contains @@ -209,15 +209,19 @@ public class HConnectionManager implements HConstants { /** {@inheritDoc} */ public boolean tableExists(final Text tableName) { - boolean exists = true; + if (tableName == null) { + throw new IllegalArgumentException("Table name cannot be null"); + } + boolean exists = false; try { - SortedMap servers = getTableServers(tableName); - if (servers == null || servers.size() == 0) { - exists = false; + HTableDescriptor[] tables = listTables(); + for (int i = 0; i < tables.length; i++) { + if (tables[i].getName().equals(tableName)) { + exists = true; + } } - } catch (IOException e) { - exists = false; + LOG.warn("Testing for table existence threw exception", e); } return exists; } @@ -400,7 +404,6 @@ public class HConnectionManager implements HConstants { throws IOException { // Wipe out everything we know about this table - if (this.tablesToServers.remove(tableName) != null) { if (LOG.isDebugEnabled()) { LOG.debug("Wiping out all we know of " + tableName); @@ -524,9 +527,10 @@ public class HConnectionManager implements HConstants { } this.tablesToServers.put(tableName, servers); if (LOG.isDebugEnabled()) { + int count = 0; for (Map.Entry e: servers.entrySet()) { - LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() + - " for table " + tableName); + LOG.debug("Region " + (1 + count++) + " of " + servers.size() + + ": " + e.getValue()); } } return servers; @@ -650,40 +654,47 @@ public class HConnectionManager implements HConstants { new TreeMap(); for (int tries = 0; servers.size() == 0 && tries < numRetries; tries++) { - long scannerId = -1L; try { - scannerId = - server.openScanner(t.getRegionInfo().getRegionName(), - COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); + scannerId = server.openScanner(t.getRegionInfo().getRegionName(), + COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); while (true) { - HRegionInfo regionInfo = null; - String serverAddress = null; KeyedData[] values = server.next(scannerId); if (values.length == 0) { if (servers.size() == 0) { // If we didn't find any servers then the table does not exist throw new TableNotFoundException("table '" + tableName + - "' does not exist in " + t); + "' does not exist in " + t); } // We found at least one server for the table and now we're done. if (LOG.isDebugEnabled()) { LOG.debug("Found " + servers.size() + " server(s) for " + - "location: " + t + " for tablename " + tableName); + tableName + " at " + t); } break; } - byte[] bytes = null; TreeMap results = new TreeMap(); for (int i = 0; i < values.length; i++) { results.put(values[i].getKey().getColumn(), values[i].getData()); } - regionInfo = new HRegionInfo(); - regionInfo = (HRegionInfo) Writables.getWritable( - results.get(COL_REGIONINFO), regionInfo); + + byte[] bytes = results.get(COL_REGIONINFO); + if (bytes == null || bytes.length == 0) { + // This can be null. Looks like an info:splitA or info:splitB + // is only item in the row. + if (LOG.isDebugEnabled()) { + LOG.debug(COL_REGIONINFO.toString() + " came back empty: " + + results.toString()); + } + servers.clear(); + break; + } + + HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable( + results.get(COL_REGIONINFO), new HRegionInfo()); if (!regionInfo.tableDesc.getName().equals(tableName)) { // We're done @@ -707,7 +718,8 @@ public class HConnectionManager implements HConstants { servers.clear(); break; } - serverAddress = Writables.bytesToString(bytes); + + String serverAddress = Writables.bytesToString(bytes); servers.put(regionInfo.startKey, new HRegionLocation( regionInfo, new HServerAddress(serverAddress))); } diff --git a/src/java/org/apache/hadoop/hbase/HLogEdit.java b/src/java/org/apache/hadoop/hbase/HLogEdit.java index d25001a83f3..6e5b6aa818b 100644 --- a/src/java/org/apache/hadoop/hbase/HLogEdit.java +++ b/src/java/org/apache/hadoop/hbase/HLogEdit.java @@ -34,6 +34,7 @@ public class HLogEdit implements Writable { private Text column = new Text(); private byte [] val; private long timestamp; + private final int MAX_VALUE_LEN = 128; /** * Default constructor used by Writable @@ -69,17 +70,23 @@ public class HLogEdit implements Writable { return this.timestamp; } - /** {@inheritDoc} */ + /** + * @return First column name, timestamp, and first 128 bytes of the value + * bytes as a String. + */ @Override public String toString() { String value = ""; try { - value = new String(getVal(), HConstants.UTF8_ENCODING); - + value = (this.val.length > MAX_VALUE_LEN)? + new String(this.val, 0, MAX_VALUE_LEN, HConstants.UTF8_ENCODING) + + "...": + new String(getVal(), HConstants.UTF8_ENCODING); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UTF8 encoding not present?", e); } - return "(" + getColumn().toString() + "/" + getTimestamp() + "/" + value + ")"; + return "(" + getColumn().toString() + "/" + getTimestamp() + "/" + + value + ")"; } // Writable @@ -99,4 +106,4 @@ public class HLogEdit implements Writable { in.readFully(this.val); this.timestamp = in.readLong(); } -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index c74b4e7e95b..aa008764b82 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -312,18 +312,16 @@ HMasterRegionInterface, Runnable { boolean noReferencesB = splitB == null; if (!noReferencesA) { - noReferencesA = - hasReferences(metaRegionName, server, info.getRegionName(), splitA, COL_SPLITA); + noReferencesA = hasReferences(metaRegionName, server, + info.getRegionName(), splitA, COL_SPLITA); } if (!noReferencesB) { - noReferencesB = - hasReferences(metaRegionName, server, info.getRegionName(), splitB, COL_SPLITB); + noReferencesB = hasReferences(metaRegionName, server, + info.getRegionName(), splitB, COL_SPLITB); } - if (!(noReferencesA && noReferencesB)) { - + if (!noReferencesA && !noReferencesB) { // No references. Remove this item from table and deleted region on // disk. - LOG.info("Deleting region " + info.getRegionName() + " because daughter splits no longer hold references"); @@ -337,7 +335,6 @@ HMasterRegionInterface, Runnable { b.delete(lockid, COL_SERVER); b.delete(lockid, COL_STARTCODE); server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); - result = true; } @@ -361,8 +358,8 @@ HMasterRegionInterface, Runnable { Path [] ps = fs.listPaths(p, new PathFilter () { - public boolean accept(Path path) { - return HStoreFile.isReference(path); + public boolean accept(Path p) { + return HStoreFile.isReference(p); } } ); @@ -394,18 +391,11 @@ HMasterRegionInterface, Runnable { final String serverName, final long startCode) { // Skip region - if ... - - if(info.offLine // offline - || killedRegions.contains(info.regionName) // queued for offline - || regionsToDelete.contains(info.regionName)) { // queued for delete - + if(info.offLine // offline + || killedRegions.contains(info.regionName) // queued for offline + || regionsToDelete.contains(info.regionName)) { // queued for delete unassignedRegions.remove(info.regionName); assignAttempts.remove(info.regionName); - - if(LOG.isDebugEnabled()) { - LOG.debug("not assigning region: " + info.regionName + " (offline: " + - info.isOffline() + ", split: " + info.isSplit() + ")"); - } return; } @@ -416,7 +406,6 @@ HMasterRegionInterface, Runnable { regionsToKill.containsKey(info.regionName)) { // Skip if region is on kill list - if(LOG.isDebugEnabled()) { LOG.debug("not assigning region (on kill list): " + info.regionName); } @@ -431,14 +420,8 @@ HMasterRegionInterface, Runnable { && (storedInfo == null || storedInfo.getStartCode() != startCode)) { // The current assignment is no good; load the region. - unassignedRegions.put(info.regionName, info); assignAttempts.put(info.regionName, Long.valueOf(0L)); - - } else if (LOG.isDebugEnabled()) { - LOG.debug("Finished if " + info.getRegionName() + " is assigned: " + - "unassigned: " + unassignedRegions.containsKey(info.regionName) + - ", pending: " + pendingRegions.contains(info.regionName)); } } } @@ -2155,8 +2138,10 @@ HMasterRegionInterface, Runnable { if (rootRegionLocation.get() == null || !rootScanned) { // We can't proceed until the root region is online and has been scanned if (LOG.isDebugEnabled()) { - LOG.debug("root region=" + rootRegionLocation.get().toString() + - ", rootScanned=" + rootScanned); + LOG.debug("root region: " + + ((rootRegionLocation != null)? + rootRegionLocation.toString(): "null") + + ", rootScanned: " + rootScanned); } return false; } diff --git a/src/java/org/apache/hadoop/hbase/HMemcache.java b/src/java/org/apache/hadoop/hbase/HMemcache.java index a2eb76f3710..16cb3968041 100644 --- a/src/java/org/apache/hadoop/hbase/HMemcache.java +++ b/src/java/org/apache/hadoop/hbase/HMemcache.java @@ -243,6 +243,7 @@ public class HMemcache { * * TODO - This is kinda slow. We need a data structure that allows for * proximity-searches, not just precise-matches. + * * @param map * @param key * @param numVersions @@ -251,13 +252,19 @@ public class HMemcache { ArrayList get(final TreeMap map, final HStoreKey key, final int numVersions) { ArrayList result = new ArrayList(); - HStoreKey curKey = - new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp()); + // TODO: If get is of a particular version -- numVersions == 1 -- we + // should be able to avoid all of the tailmap creations and iterations + // below. + HStoreKey curKey = new HStoreKey(key); SortedMap tailMap = map.tailMap(curKey); for (Map.Entry es: tailMap.entrySet()) { HStoreKey itKey = es.getKey(); if (itKey.matchesRowCol(curKey)) { if(HConstants.DELETE_BYTES.compareTo(es.getValue()) == 0) { + // TODO: Shouldn't this be a continue rather than a break? Perhaps + // the intent is that this DELETE_BYTES is meant to suppress older + // info -- see 5.4 Compactions in BigTable -- but how does this jibe + // with being able to remove one version only? break; } result.add(tailMap.get(itKey)); diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 20dcb6aaeff..0debc8323e3 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -92,15 +92,13 @@ public class HRegion implements HConstants { // Make sure that srcA comes first; important for key-ordering during // write of the merged file. FileSystem fs = srcA.getFilesystem(); - if(srcA.getStartKey() == null) { - if(srcB.getStartKey() == null) { + if (srcA.getStartKey() == null) { + if (srcB.getStartKey() == null) { throw new IOException("Cannot merge two regions with null start key"); } // A's start key is null but B's isn't. Assume A comes before B - - } else if((srcB.getStartKey() == null) // A is not null but B is + } else if ((srcB.getStartKey() == null) // A is not null but B is || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B - a = srcB; b = srcA; } @@ -113,10 +111,8 @@ public class HRegion implements HConstants { HTableDescriptor tabledesc = a.getTableDesc(); HLog log = a.getLog(); Path rootDir = a.getRootDir(); - Text startKey = a.getStartKey(); Text endKey = b.getEndKey(); - Path merges = new Path(a.getRegionDir(), MERGEDIR); if(! fs.exists(merges)) { fs.mkdirs(merges); @@ -124,95 +120,20 @@ public class HRegion implements HConstants { HRegionInfo newRegionInfo = new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey); - Path newRegionDir = HRegion.getRegionDir(merges, newRegionInfo.regionName); - if(fs.exists(newRegionDir)) { - throw new IOException("Cannot merge; target file collision at " + newRegionDir); + throw new IOException("Cannot merge; target file collision at " + + newRegionDir); } LOG.info("starting merge of regions: " + a.getRegionName() + " and " + b.getRegionName() + " into new region " + newRegionInfo.toString()); - - // Flush each of the sources, and merge their files into a single - // target for each column family. - TreeSet alreadyMerged = new TreeSet(); - TreeMap> filesToMerge = + + Map> byFamily = new TreeMap>(); - - for(HStoreFile src: a.flushcache(true)) { - Vector v = filesToMerge.get(src.getColFamily()); - if(v == null) { - v = new Vector(); - filesToMerge.put(src.getColFamily(), v); - } - v.add(src); - } - - for(HStoreFile src: b.flushcache(true)) { - Vector v = filesToMerge.get(src.getColFamily()); - if(v == null) { - v = new Vector(); - filesToMerge.put(src.getColFamily(), v); - } - v.add(src); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("merging stores"); - } - - for (Map.Entry> es: filesToMerge.entrySet()) { - Text colFamily = es.getKey(); - Vector srcFiles = es.getValue(); - HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, - colFamily, Math.abs(rand.nextLong())); - dst.mergeStoreFiles(srcFiles, fs, conf); - alreadyMerged.addAll(srcFiles); - } - - // That should have taken care of the bulk of the data. - // Now close the source HRegions for good, and repeat the above to take care - // of any last-minute inserts - if(LOG.isDebugEnabled()) { - LOG.debug("flushing changes since start of merge for region " - + a.getRegionName()); - } - - filesToMerge.clear(); - - for(HStoreFile src: a.close()) { - if(! alreadyMerged.contains(src)) { - Vector v = filesToMerge.get(src.getColFamily()); - if(v == null) { - v = new Vector(); - filesToMerge.put(src.getColFamily(), v); - } - v.add(src); - } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("flushing changes since start of merge for region " - + b.getRegionName()); - } - - for(HStoreFile src: b.close()) { - if(! alreadyMerged.contains(src)) { - Vector v = filesToMerge.get(src.getColFamily()); - if(v == null) { - v = new Vector(); - filesToMerge.put(src.getColFamily(), v); - } - v.add(src); - } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("merging changes since start of merge"); - } - - for (Map.Entry> es : filesToMerge.entrySet()) { + byFamily = filesByFamily(byFamily, a.close()); + byFamily = filesByFamily(byFamily, b.close()); + for (Map.Entry> es : byFamily.entrySet()) { Text colFamily = es.getKey(); Vector srcFiles = es.getValue(); HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, @@ -233,6 +154,25 @@ public class HRegion implements HConstants { return dstRegion; } + + /* + * Fills a map with a vector of store files keyed by column family. + * @param byFamily Map to fill. + * @param storeFiles Store files to process. + * @return Returns byFamily + */ + private static Map> filesByFamily( + Map> byFamily, Vector storeFiles) { + for(HStoreFile src: storeFiles) { + Vector v = byFamily.get(src.getColFamily()); + if(v == null) { + v = new Vector(); + byFamily.put(src.getColFamily(), v); + } + v.add(src); + } + return byFamily; + } ////////////////////////////////////////////////////////////////////////////// // Members @@ -254,19 +194,19 @@ public class HRegion implements HConstants { Path regiondir; static class WriteState { - volatile boolean writesOngoing; - volatile boolean writesEnabled; - WriteState() { - this.writesOngoing = true; - this.writesEnabled = true; - } + // Set while a memcache flush is happening. + volatile boolean flushing = false; + // Set while a compaction is running. + volatile boolean compacting = false; + // Gets set by last flush before close. If set, cannot compact or flush + // again. + volatile boolean writesEnabled = true; } volatile WriteState writestate = new WriteState(); final int memcacheFlushSize; final int blockingMemcacheSize; - int compactionThreshold = 0; private final HLocking lock = new HLocking(); private long desiredMaxFileSize; private final long maxSequenceId; @@ -297,15 +237,12 @@ public class HRegion implements HConstants { public HRegion(Path rootDir, HLog log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, Path initialFiles) throws IOException { - this.rootDir = rootDir; this.log = log; this.fs = fs; this.conf = conf; this.regionInfo = regionInfo; this.memcache = new HMemcache(); - this.writestate.writesOngoing = true; - this.writestate.writesEnabled = true; // Declare the regionName. This is a unique string for the region, used to // build a unique filename. @@ -319,7 +256,6 @@ public class HRegion implements HConstants { } // Load in all the HStores. - long maxSeqId = -1; for(Map.Entry e : this.regionInfo.tableDesc.families().entrySet()) { @@ -357,17 +293,12 @@ public class HRegion implements HConstants { this.blockingMemcacheSize = this.memcacheFlushSize * conf.getInt("hbase.hregion.memcache.block.multiplier", 2); - // By default, we compact the region if an HStore has more than - // MIN_COMMITS_FOR_COMPACTION map files - this.compactionThreshold = - conf.getInt("hbase.hregion.compactionThreshold", 3); - // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. this.desiredMaxFileSize = conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); // HRegion is ready to go! - this.writestate.writesOngoing = false; + this.writestate.compacting = false; LOG.info("region " + this.regionInfo.regionName + " available"); } @@ -411,56 +342,48 @@ public class HRegion implements HConstants { * * @param abort true if server is aborting (only during testing) * @return Vector of all the storage files that the HRegion's component - * HStores make use of. It's a list of HStoreFile objects. + * HStores make use of. It's a list of HStoreFile objects. Can be null if + * we are not to close at this time or we are already closed. * * @throws IOException */ Vector close(boolean abort) throws IOException { if (isClosed()) { LOG.info("region " + this.regionInfo.regionName + " already closed"); - return new Vector(); + return null; } lock.obtainWriteLock(); try { - boolean shouldClose = false; synchronized(writestate) { - while(writestate.writesOngoing) { + while(writestate.compacting || writestate.flushing) { try { writestate.wait(); } catch (InterruptedException iex) { // continue } } - writestate.writesOngoing = true; - shouldClose = true; - } - - if(!shouldClose) { - return null; + // Disable compacting and flushing by background threads for this + // region. + writestate.writesEnabled = false; } // Write lock means no more row locks can be given out. Wait on // outstanding row locks to come in before we close so we do not drop // outstanding updates. waitOnRowLocks(); - - Vector allHStoreFiles = null; + if (!abort) { // Don't flush the cache if we are aborting during a test. - allHStoreFiles = internalFlushcache(); + internalFlushcache(); } + + Vector result = new Vector(); for (HStore store: stores.values()) { - store.close(); - } - try { - return allHStoreFiles; - } finally { - synchronized (writestate) { - writestate.writesOngoing = false; - } - this.closed.set(true); - LOG.info("closed " + this.regionInfo.regionName); + result.addAll(store.close()); } + this.closed.set(true); + LOG.info("closed " + this.regionInfo.regionName); + return result; } finally { lock.releaseWriteLock(); } @@ -527,6 +450,7 @@ public class HRegion implements HConstants { HStoreFile a = new HStoreFile(this.conf, splits, regionAInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()), aReference); + // Reference to top half of the hsf store file. HStoreFile.Reference bReference = new HStoreFile.Reference( getRegionName(), h.getFileId(), new HStoreKey(midKey), HStoreFile.Range.top); @@ -721,12 +645,10 @@ public class HRegion implements HConstants { boolean needsCompaction = false; this.lock.obtainReadLock(); try { - for(HStore store: stores.values()) { - if(store.getNMaps() > this.compactionThreshold) { + for (HStore store: stores.values()) { + if (store.needsCompaction()) { needsCompaction = true; - LOG.info(getRegionName().toString() + " needs compaction because " + - store.getNMaps() + " store files present and threshold is " + - this.compactionThreshold); + LOG.info(store.toString() + " needs compaction"); break; } } @@ -756,9 +678,9 @@ public class HRegion implements HConstants { lock.obtainReadLock(); try { synchronized (writestate) { - if ((!writestate.writesOngoing) && + if ((!writestate.compacting) && writestate.writesEnabled) { - writestate.writesOngoing = true; + writestate.compacting = true; shouldCompact = true; } } @@ -783,7 +705,7 @@ public class HRegion implements HConstants { } finally { lock.releaseReadLock(); synchronized (writestate) { - writestate.writesOngoing = false; + writestate.compacting = false; writestate.notifyAll(); } } @@ -825,23 +747,17 @@ public class HRegion implements HConstants { * close() the HRegion shortly, so the HRegion should not take on any new and * potentially long-lasting disk operations. This flush() should be the final * pre-close() disk operation. - * - * @return List of store files including new flushes, if any. If no flushes - * because memcache is null, returns all current store files. Returns - * null if no flush (Writes are going on elsewhere -- concurrently we are - * compacting or splitting). */ - Vector flushcache(boolean disableFutureWrites) + void flushcache(boolean disableFutureWrites) throws IOException { if (this.closed.get()) { - return null; + return; } this.noFlushCount = 0; boolean shouldFlush = false; synchronized(writestate) { - if((!writestate.writesOngoing) && - writestate.writesEnabled) { - writestate.writesOngoing = true; + if((!writestate.flushing) && writestate.writesEnabled) { + writestate.flushing = true; shouldFlush = true; if(disableFutureWrites) { writestate.writesEnabled = false; @@ -854,14 +770,14 @@ public class HRegion implements HConstants { LOG.debug("NOT flushing memcache for region " + this.regionInfo.regionName); } - return null; + return; } try { - return internalFlushcache(); + internalFlushcache(); } finally { synchronized (writestate) { - writestate.writesOngoing = false; + writestate.flushing = false; writestate.notifyAll(); } } @@ -892,11 +808,8 @@ public class HRegion implements HConstants { * routes. * *

This method may block for some time. - * - * @return List of store files including just-made new flushes per-store. If - * not flush, returns list of all store files. */ - Vector internalFlushcache() throws IOException { + void internalFlushcache() throws IOException { long startTime = -1; if(LOG.isDebugEnabled()) { startTime = System.currentTimeMillis(); @@ -917,7 +830,7 @@ public class HRegion implements HConstants { HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log); if(retval == null || retval.memcacheSnapshot == null) { LOG.debug("Finished memcache flush; empty snapshot"); - return getAllStoreFiles(); + return; } long logCacheFlushId = retval.sequenceId; if(LOG.isDebugEnabled()) { @@ -929,11 +842,8 @@ public class HRegion implements HConstants { // A. Flush memcache to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. - Vector allHStoreFiles = new Vector(); for(HStore hstore: stores.values()) { - Vector hstoreFiles - = hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); - allHStoreFiles.addAll(0, hstoreFiles); + hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); } // B. Write a FLUSHCACHE-COMPLETE message to the log. @@ -958,13 +868,12 @@ public class HRegion implements HConstants { this.regionInfo.regionName + " in " + (System.currentTimeMillis() - startTime) + "ms"); } - return allHStoreFiles; } private Vector getAllStoreFiles() { Vector allHStoreFiles = new Vector(); for(HStore hstore: stores.values()) { - Vector hstoreFiles = hstore.getAllMapFiles(); + Vector hstoreFiles = hstore.getAllStoreFiles(); allHStoreFiles.addAll(0, hstoreFiles); } return allHStoreFiles; @@ -1020,7 +929,6 @@ public class HRegion implements HConstants { } // If unavailable in memcache, check the appropriate HStore - Text colFamily = HStoreKey.extractFamily(key.getColumn()); HStore targetStore = stores.get(colFamily); if(targetStore == null) { diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 7a25f726ae9..ab64197d504 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -158,7 +158,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { try { for(HRegion cur: regionsToCheck) { if(cur.isClosed()) { - continue; // Skip if closed + // Skip if closed + continue; } if (cur.needsCompaction()) { cur.compactStores(); @@ -272,10 +273,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { protected final Integer cacheFlusherLock = new Integer(0); /* Runs periodically to flush memcache. - * - * Memcache flush is also called just before compaction and just before - * split so memcache is best prepared for the the long trip across - * compactions/splits during which it will not be able to flush to disk. */ class Flusher implements Runnable { /** @@ -286,9 +283,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { long startTime = System.currentTimeMillis(); synchronized(cacheFlusherLock) { - // Grab a list of items to flush - Vector toFlush = new Vector(); lock.readLock().lock(); try { @@ -837,6 +832,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { BlockingQueue toDo; private Worker worker; private Thread workerThread; + /** Thread that performs long running requests from the master */ class Worker implements Runnable { void stop() { @@ -910,7 +906,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { HRegion region = onlineRegions.get(regionInfo.regionName); if(region == null) { region = new HRegion(rootDir, log, fs, conf, regionInfo, null); - this.lock.writeLock().lock(); try { this.log.setSequenceNumber(region.getMaxSequenceId()); @@ -1193,7 +1188,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * @return {@link HRegion} for regionName * @throws NotServingRegionException */ - protected HRegion getRegion(final Text regionName, + protected HRegion getRegion(final Text regionName, final boolean checkRetiringRegions) throws NotServingRegionException { HRegion region = null; diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index fdb90e90a89..93ed5305582 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; import java.util.Vector; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.StringUtils; import org.onelab.filter.BloomFilter; @@ -92,6 +94,8 @@ class HStore implements HConstants { Random rand = new Random(); private long maxSeqId; + + private int compactionThreshold; /** * An HStore is a set of zero or more MapFiles, which stretch backwards over @@ -164,7 +168,7 @@ class HStore implements HConstants { if(LOG.isDebugEnabled()) { LOG.debug("starting " + this.storeName + - ((reconstructionLog == null)? + ((reconstructionLog == null || !fs.exists(reconstructionLog))? " (no reconstruction log)": " with reconstruction log: " + reconstructionLog.toString())); } @@ -215,19 +219,19 @@ class HStore implements HConstants { } doReconstructionLog(reconstructionLog, maxSeqId); - this.maxSeqId += 1; - // Compact all the MapFiles into a single file. The resulting MapFile - // should be "timeless"; that is, it should not have an associated seq-ID, - // because all log messages have been reflected in the TreeMaps at this - // point. - // - // TODO: Only do the compaction if we are over a threshold, not - // every time. Not necessary if only two or three store files. Fix after - // revamp of compaction. - if(storefiles.size() > 1) { - compactHelper(true); - } + // By default, we compact if an HStore has more than + // MIN_COMMITS_FOR_COMPACTION map files + this.compactionThreshold = + conf.getInt("hbase.hstore.compactionThreshold", 3); + + // We used to compact in here before bringing the store online. Instead + // get it online quick even if it needs compactions so we can start + // taking updates as soon as possible (Once online, can take updates even + // during a compaction). + + // Move maxSeqId on by one. Why here? And not in HRegion? + this.maxSeqId += 1; // Finally, start up all the map readers! (There should be just one at this // point, as we've compacted them all.) @@ -253,10 +257,6 @@ class HStore implements HConstants { final long maxSeqID) throws UnsupportedEncodingException, IOException { if (reconstructionLog == null || !fs.exists(reconstructionLog)) { - if (reconstructionLog != null && !fs.exists(reconstructionLog)) { - LOG.warn("Passed reconstruction log " + reconstructionLog + - " does not exist"); - } // Nothing to do. return; } @@ -397,15 +397,18 @@ class HStore implements HConstants { * Close all the MapFile readers * @throws IOException */ - void close() throws IOException { + Vector close() throws IOException { + Vector result = null; this.lock.obtainWriteLock(); try { for (MapFile.Reader reader: this.readers.values()) { reader.close(); } this.readers.clear(); + result = new Vector(storefiles.values()); this.storefiles.clear(); LOG.info("closed " + this.storeName); + return result; } finally { this.lock.releaseWriteLock(); } @@ -428,16 +431,15 @@ class HStore implements HConstants { * * @param inputCache memcache to flush * @param logCacheFlushId flush sequence number - * @return Vector of all the HStoreFiles in use * @throws IOException */ - Vector flushCache(final TreeMap inputCache, + void flushCache(final TreeMap inputCache, final long logCacheFlushId) throws IOException { - return flushCacheHelper(inputCache, logCacheFlushId, true); + flushCacheHelper(inputCache, logCacheFlushId, true); } - Vector flushCacheHelper(TreeMap inputCache, + void flushCacheHelper(TreeMap inputCache, long logCacheFlushId, boolean addToAvailableMaps) throws IOException { synchronized(flushLock) { @@ -447,12 +449,31 @@ class HStore implements HConstants { String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, this.bloomFilter); + + // hbase.hstore.compact.on.flush=true enables picking up an existing + // HStoreFIle from disk interlacing the memcache flush compacting as we + // go. The notion is that interlacing would take as long as a pure + // flush with the added benefit of having one less file in the store. + // Experiments show that it takes two to three times the amount of time + // flushing -- more column families makes it so the two timings come + // closer together -- but it also complicates the flush. Disabled for + // now. Needs work picking which file to interlace (favor references + // first, etc.) + // + // Related, looks like 'merging compactions' in BigTable paper interlaces + // a memcache flush. We don't. try { - for (Map.Entry es: inputCache.entrySet()) { - HStoreKey curkey = es.getKey(); - if (this.familyName. - equals(HStoreKey.extractFamily(curkey.getColumn()))) { - out.append(curkey, new ImmutableBytesWritable(es.getValue())); + if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) && + this.storefiles.size() > 0) { + compact(out, inputCache.entrySet().iterator(), + this.readers.get(this.storefiles.firstKey())); + } else { + for (Map.Entry es: inputCache.entrySet()) { + HStoreKey curkey = es.getKey(); + if (this.familyName. + equals(HStoreKey.extractFamily(curkey.getColumn()))) { + out.append(curkey, new ImmutableBytesWritable(es.getValue())); + } } } } finally { @@ -486,14 +507,14 @@ class HStore implements HConstants { this.lock.releaseWriteLock(); } } - return getAllMapFiles(); + return; } } /** * @return - vector of all the HStore files in use */ - Vector getAllMapFiles() { + Vector getAllStoreFiles() { this.lock.obtainReadLock(); try { return new Vector(storefiles.values()); @@ -505,6 +526,14 @@ class HStore implements HConstants { ////////////////////////////////////////////////////////////////////////////// // Compaction ////////////////////////////////////////////////////////////////////////////// + + /** + * @return True if this store needs compaction. + */ + public boolean needsCompaction() { + return this.storefiles != null && + this.storefiles.size() >= this.compactionThreshold; + } /** * Compact the back-HStores. This method may take some time, so the calling @@ -528,11 +557,24 @@ class HStore implements HConstants { compactHelper(false); } - void compactHelper(boolean deleteSequenceInfo) throws IOException { + void compactHelper(final boolean deleteSequenceInfo) + throws IOException { + compactHelper(deleteSequenceInfo, -1); + } + + /* + * @param deleteSequenceInfo True if we are to set the sequence number to -1 + * on compacted file. + * @param maxSeenSeqID We may have already calculated the maxSeenSeqID. If + * so, pass it here. Otherwise, pass -1 and it will be calculated inside in + * this method. + * @throws IOException + */ + void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID) + throws IOException { synchronized(compactLock) { Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, familyName); - fs.mkdirs(curCompactStore); if(LOG.isDebugEnabled()) { LOG.debug("started compaction of " + storefiles.size() + " files in " + curCompactStore.toString()); @@ -547,28 +589,32 @@ class HStore implements HConstants { this.lock.releaseWriteLock(); } - // Compute the max-sequenceID seen in any of the to-be-compacted - // TreeMaps - long maxSeenSeqID = -1; - for (HStoreFile hsf: toCompactFiles) { - long seqid = hsf.loadInfo(fs); - if(seqid > 0) { - if(seqid > maxSeenSeqID) { - maxSeenSeqID = seqid; - } - } - } - - HStoreFile compactedOutputFile - = new HStoreFile(conf, compactdir, regionName, familyName, -1); - if(toCompactFiles.size() == 1) { - // TODO: Only rewrite if NOT a HSF reference file. - if(LOG.isDebugEnabled()) { + HStoreFile compactedOutputFile = + new HStoreFile(conf, compactdir, regionName, familyName, -1); + if (toCompactFiles.size() < 1 || + (toCompactFiles.size() == 1 && + !toCompactFiles.get(0).isReference())) { + if (LOG.isDebugEnabled()) { LOG.debug("nothing to compact for " + this.storeName); } - HStoreFile hsf = toCompactFiles.elementAt(0); - if(hsf.loadInfo(fs) == -1) { - return; + if (deleteSequenceInfo && toCompactFiles.size() == 1) { + toCompactFiles.get(0).writeInfo(fs, -1); + } + return; + } + + fs.mkdirs(curCompactStore); + + // Compute the max-sequenceID seen in any of the to-be-compacted + // TreeMaps if it hasn't been passed in to us. + if (maxSeenSeqID == -1) { + for (HStoreFile hsf: toCompactFiles) { + long seqid = hsf.loadInfo(fs); + if(seqid > 0) { + if(seqid > maxSeenSeqID) { + maxSeenSeqID = seqid; + } + } } } @@ -577,108 +623,11 @@ class HStore implements HConstants { compactedOutputFile.getWriter(this.fs, this.compression, this.bloomFilter); try { - // We create a new set of MapFile.Reader objects so we don't screw up - // the caching associated with the currently-loaded ones. - // - // Our iteration-based access pattern is practically designed to ruin - // the cache. - // - // We work by opening a single MapFile.Reader for each file, and - // iterating through them in parallel. We always increment the - // lowest-ranked one. Updates to a single row/column will appear - // ranked by timestamp. This allows us to throw out deleted values or - // obsolete versions. - MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()]; - HStoreKey[] keys = new HStoreKey[toCompactFiles.size()]; - ImmutableBytesWritable[] vals = - new ImmutableBytesWritable[toCompactFiles.size()]; - boolean[] done = new boolean[toCompactFiles.size()]; - int pos = 0; - for(HStoreFile hsf: toCompactFiles) { - rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter); - keys[pos] = new HStoreKey(); - vals[pos] = new ImmutableBytesWritable(); - done[pos] = false; - pos++; - } - - // Now, advance through the readers in order. This will have the - // effect of a run-time sort of the entire dataset. - int numDone = 0; - for(int i = 0; i < rdrs.length; i++) { - rdrs[i].reset(); - done[i] = ! rdrs[i].next(keys[i], vals[i]); - if(done[i]) { - numDone++; - } - } - - int timesSeen = 0; - Text lastRow = new Text(); - Text lastColumn = new Text(); - while(numDone < done.length) { - // Find the reader with the smallest key - int smallestKey = -1; - for(int i = 0; i < rdrs.length; i++) { - if(done[i]) { - continue; - } - - if(smallestKey < 0) { - smallestKey = i; - } else { - if(keys[i].compareTo(keys[smallestKey]) < 0) { - smallestKey = i; - } - } - } - - // Reflect the current key/val in the output - HStoreKey sk = keys[smallestKey]; - if(lastRow.equals(sk.getRow()) - && lastColumn.equals(sk.getColumn())) { - timesSeen++; - } else { - timesSeen = 1; - } - - if(timesSeen <= family.getMaxVersions()) { - // Keep old versions until we have maxVersions worth. - // Then just skip them. - if(sk.getRow().getLength() != 0 - && sk.getColumn().getLength() != 0) { - // Only write out objects which have a non-zero length key and - // value - compactedOut.append(sk, vals[smallestKey]); - } - } - - // TODO: I don't know what to do about deleted values. I currently - // include the fact that the item was deleted as a legitimate - // "version" of the data. Maybe it should just drop the deleted - // val? - - // Update last-seen items - lastRow.set(sk.getRow()); - lastColumn.set(sk.getColumn()); - - // Advance the smallest key. If that reader's all finished, then - // mark it as done. - if(! rdrs[smallestKey].next(keys[smallestKey], - vals[smallestKey])) { - done[smallestKey] = true; - rdrs[smallestKey].close(); - numDone++; - } - } + compact(compactedOut, toCompactFiles); } finally { compactedOut.close(); } - if(LOG.isDebugEnabled()) { - LOG.debug("writing new compacted HStore " + compactedOutputFile); - } - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. if((! deleteSequenceInfo) && maxSeenSeqID >= 0) { compactedOutputFile.writeInfo(fs, maxSeenSeqID); @@ -691,8 +640,7 @@ class HStore implements HConstants { DataOutputStream out = new DataOutputStream(fs.create(filesToReplace)); try { out.writeInt(toCompactFiles.size()); - for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) { - HStoreFile hsf = it.next(); + for(HStoreFile hsf: toCompactFiles) { hsf.write(out); } } finally { @@ -706,7 +654,207 @@ class HStore implements HConstants { // Move the compaction into place. processReadyCompaction(); } finally { - fs.delete(compactdir); + if (fs.exists(compactdir)) { + fs.delete(compactdir); + } + } + } + } + + /* + * Compact passed toCompactFiles into compactedOut. + * We create a new set of MapFile.Reader objects so we don't screw up + * the caching associated with the currently-loaded ones. Our + * iteration-based access pattern is practically designed to ruin + * the cache. + * + * We work by opening a single MapFile.Reader for each file, and + * iterating through them in parallel. We always increment the + * lowest-ranked one. Updates to a single row/column will appear + * ranked by timestamp. This allows us to throw out deleted values or + * obsolete versions. + * @param compactedOut + * @param toCompactFiles + * @throws IOException + */ + void compact(final MapFile.Writer compactedOut, + final Vector toCompactFiles) + throws IOException { + int size = toCompactFiles.size(); + CompactionReader[] rdrs = new CompactionReader[size]; + int index = 0; + for (HStoreFile hsf: toCompactFiles) { + try { + rdrs[index++] = + new MapFileCompactionReader(hsf.getReader(fs, bloomFilter)); + } catch (IOException e) { + // Add info about which file threw exception. It may not be in the + // exception message so output a message here where we know the + // culprit. + LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + + (hsf.isReference()? " " + hsf.getReference().toString(): "")); + throw e; + } + } + try { + compact(compactedOut, rdrs); + } finally { + for (int i = 0; i < rdrs.length; i++) { + if (rdrs[i] != null) { + try { + rdrs[i].close(); + } catch (IOException e) { + LOG.warn("Exception closing reader", e); + } + } + } + } + } + + interface CompactionReader { + public void close() throws IOException; + public boolean next(WritableComparable key, Writable val) + throws IOException; + public void reset() throws IOException; + } + + class MapFileCompactionReader implements CompactionReader { + final MapFile.Reader reader; + + MapFileCompactionReader(final MapFile.Reader r) { + this.reader = r; + } + + public void close() throws IOException { + this.reader.close(); + } + + public boolean next(WritableComparable key, Writable val) + throws IOException { + return this.reader.next(key, val); + } + + public void reset() throws IOException { + this.reader.reset(); + } + } + + void compact(final MapFile.Writer compactedOut, + final Iterator> iterator, + final MapFile.Reader reader) + throws IOException { + // Make an instance of a CompactionReader that wraps the iterator. + CompactionReader cr = new CompactionReader() { + public boolean next(WritableComparable key, Writable val) + throws IOException { + boolean result = false; + while (iterator.hasNext()) { + Entry e = iterator.next(); + HStoreKey hsk = e.getKey(); + if (familyName.equals(HStoreKey.extractFamily(hsk.getColumn()))) { + ((HStoreKey)key).set(hsk); + ((ImmutableBytesWritable)val).set(e.getValue()); + result = true; + break; + } + } + return result; + } + + @SuppressWarnings("unused") + public void reset() throws IOException { + // noop. + } + + @SuppressWarnings("unused") + public void close() throws IOException { + // noop. + } + }; + + compact(compactedOut, + new CompactionReader [] {cr, new MapFileCompactionReader(reader)}); + } + + void compact(final MapFile.Writer compactedOut, + final CompactionReader [] rdrs) + throws IOException { + HStoreKey[] keys = new HStoreKey[rdrs.length]; + ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length]; + boolean[] done = new boolean[rdrs.length]; + for(int i = 0; i < rdrs.length; i++) { + keys[i] = new HStoreKey(); + vals[i] = new ImmutableBytesWritable(); + done[i] = false; + } + + // Now, advance through the readers in order. This will have the + // effect of a run-time sort of the entire dataset. + int numDone = 0; + for(int i = 0; i < rdrs.length; i++) { + rdrs[i].reset(); + done[i] = ! rdrs[i].next(keys[i], vals[i]); + if(done[i]) { + numDone++; + } + } + + int timesSeen = 0; + Text lastRow = new Text(); + Text lastColumn = new Text(); + while(numDone < done.length) { + // Find the reader with the smallest key + int smallestKey = -1; + for(int i = 0; i < rdrs.length; i++) { + if(done[i]) { + continue; + } + if(smallestKey < 0) { + smallestKey = i; + } else { + if(keys[i].compareTo(keys[smallestKey]) < 0) { + smallestKey = i; + } + } + } + + // Reflect the current key/val in the output + HStoreKey sk = keys[smallestKey]; + if(lastRow.equals(sk.getRow()) + && lastColumn.equals(sk.getColumn())) { + timesSeen++; + } else { + timesSeen = 1; + } + + if(timesSeen <= family.getMaxVersions()) { + // Keep old versions until we have maxVersions worth. + // Then just skip them. + if(sk.getRow().getLength() != 0 + && sk.getColumn().getLength() != 0) { + // Only write out objects which have a non-zero length key and + // value + compactedOut.append(sk, vals[smallestKey]); + } + } + + // TODO: I don't know what to do about deleted values. I currently + // include the fact that the item was deleted as a legitimate + // "version" of the data. Maybe it should just drop the deleted + // val? + + // Update last-seen items + lastRow.set(sk.getRow()); + lastColumn.set(sk.getColumn()); + + // Advance the smallest key. If that reader's all finished, then + // mark it as done. + if(!rdrs[smallestKey].next(keys[smallestKey], + vals[smallestKey])) { + done[smallestKey] = true; + rdrs[smallestKey].close(); + rdrs[smallestKey] = null; + numDone++; } } } @@ -773,21 +921,19 @@ class HStore implements HConstants { } } + Vector toDelete = new Vector(keys.size()); for (Long key: keys) { MapFile.Reader reader = this.readers.remove(key); if (reader != null) { reader.close(); } HStoreFile hsf = this.storefiles.remove(key); - // 4. Delete all old files, no longer needed - hsf.delete(); - } - if(LOG.isDebugEnabled()) { - LOG.debug("deleted " + toCompactFiles.size() + " old file(s)"); + // 4. Add to the toDelete files all old files, no longer needed + toDelete.add(hsf); } - // What if we fail now? The above deletes will fail silently. We'd better - // make sure not to write out any new files with the same names as + // What if we fail now? The above deletes will fail silently. We'd + // better make sure not to write out any new files with the same names as // something we delete, though. // 5. Moving the new MapFile into place @@ -800,9 +946,23 @@ class HStore implements HConstants { compactdir.toString() + " to " + finalCompactedFile.toString() + " in " + dir.toString()); } - compactedFile.rename(this.fs, finalCompactedFile); + if (!compactedFile.rename(this.fs, finalCompactedFile)) { + LOG.error("Failed move of compacted file " + + finalCompactedFile.toString()); + return; + } + + // Safe to delete now compaction has been moved into place. + for (HStoreFile hsf: toDelete) { + if (hsf.getFileId() == finalCompactedFile.getFileId()) { + // Be careful we do not delte the just compacted file. + LOG.warn("Weird. File to delete has same name as one we are " + + "about to delete (skipping): " + hsf.getFileId()); + continue; + } + hsf.delete(); + } - // Fail here? No worries. Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); // 6. Loading the new TreeMap. @@ -810,7 +970,6 @@ class HStore implements HConstants { finalCompactedFile.getReader(this.fs, this.bloomFilter)); this.storefiles.put(orderVal, finalCompactedFile); } finally { - // 7. Releasing the write-lock this.lock.releaseWriteLock(); } @@ -838,6 +997,9 @@ class HStore implements HConstants { map.reset(); ImmutableBytesWritable readval = new ImmutableBytesWritable(); HStoreKey readkey = (HStoreKey)map.getClosest(key, readval); + if (readkey == null) { + continue; + } do { Text readcol = readkey.getColumn(); if (results.get(readcol) == null @@ -1004,7 +1166,7 @@ class HStore implements HConstants { /** * @return Returns the number of map files currently in use */ - int getNMaps() { + int countOfStoreFiles() { this.lock.obtainReadLock(); try { return storefiles.size(); @@ -1014,6 +1176,22 @@ class HStore implements HConstants { } } + boolean hasReferences() { + boolean result = false; + this.lock.obtainReadLock(); + try { + for (HStoreFile hsf: this.storefiles.values()) { + if (hsf.isReference()) { + break; + } + } + + } finally { + this.lock.releaseReadLock(); + } + return result; + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -1038,6 +1216,11 @@ class HStore implements HConstants { return new HStoreScanner(timestamp, targetCols, firstRow); } + + @Override + public String toString() { + return this.storeName; + } ////////////////////////////////////////////////////////////////////////////// // This class implements the HScannerInterface. diff --git a/src/java/org/apache/hadoop/hbase/HStoreFile.java b/src/java/org/apache/hadoop/hbase/HStoreFile.java index 37e325b2b66..a9e3c15c6f0 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreFile.java +++ b/src/java/org/apache/hadoop/hbase/HStoreFile.java @@ -399,7 +399,13 @@ public class HStoreFile implements HConstants, WritableComparable { Path mapfile = curfile.getMapFilePath(); if (!fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); + LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " + + "Cleaned up info file. Continuing..."); + continue; } + + // TODO: Confirm referent exists. + // Found map and sympathetic info file. Add this hstorefile to result. results.add(curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next @@ -537,8 +543,7 @@ public class HStoreFile implements HConstants, WritableComparable { try { for(HStoreFile src: srcFiles) { - MapFile.Reader in = - new MapFile.Reader(fs, src.getMapFilePath().toString(), conf); + MapFile.Reader in = src.getReader(fs, null); try { HStoreKey readkey = new HStoreKey(); ImmutableBytesWritable readval = new ImmutableBytesWritable(); @@ -627,12 +632,23 @@ public class HStoreFile implements HConstants, WritableComparable { * hsf directory. * @param fs * @param hsf + * @return True if succeeded. * @throws IOException */ - public void rename(final FileSystem fs, final HStoreFile hsf) + public boolean rename(final FileSystem fs, final HStoreFile hsf) throws IOException { - fs.rename(getMapFilePath(), hsf.getMapFilePath()); - fs.rename(getInfoFilePath(), hsf.getInfoFilePath()); + boolean success = fs.rename(getMapFilePath(), hsf.getMapFilePath()); + if (!success) { + LOG.warn("Failed rename of " + getMapFilePath() + " to " + + hsf.getMapFilePath()); + return success; + } + success = fs.rename(getInfoFilePath(), hsf.getInfoFilePath()); + if (!success) { + LOG.warn("Failed rename of " + getInfoFilePath() + " to " + + hsf.getInfoFilePath()); + } + return success; } /** diff --git a/src/java/org/apache/hadoop/hbase/HStoreKey.java b/src/java/org/apache/hadoop/hbase/HStoreKey.java index 2cb94ebbfe7..452894ae1e8 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreKey.java +++ b/src/java/org/apache/hadoop/hbase/HStoreKey.java @@ -301,6 +301,10 @@ public class HStoreKey implements WritableComparable { if(result == 0) { result = this.column.compareTo(other.column); if(result == 0) { + // The below older timestamps sorting ahead of newer timestamps looks + // wrong but it is intentional. This way, newer timestamps are first + // found when we iterate over a memcache and newer versions are the + // first we trip over when reading from a store file. if(this.timestamp < other.timestamp) { result = 1; } else if(this.timestamp > other.timestamp) { diff --git a/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java b/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java index c8be276c354..3f90032df53 100644 --- a/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java @@ -87,6 +87,13 @@ public class ImmutableBytesWritable implements WritableComparable { return this.bytes; } + /** + * @param b Use passed bytes as backing array for this instance. + */ + public void set(final byte [] b) { + this.bytes = b; + } + /** * @return the current size of the buffer. */ diff --git a/src/java/org/apache/hadoop/hbase/util/Writables.java b/src/java/org/apache/hadoop/hbase/util/Writables.java index de1a2346374..0ad4598168e 100644 --- a/src/java/org/apache/hadoop/hbase/util/Writables.java +++ b/src/java/org/apache/hadoop/hbase/util/Writables.java @@ -66,15 +66,16 @@ public class Writables { * @param w An empty Writable (usually made by calling the null-arg * constructor). * @return The passed Writable after its readFields has been called fed - * by the passed bytes array or null if passed null or - * empty bytes. + * by the passed bytes array or IllegalArgumentException + * if passed null or an empty bytes array. * @throws IOException + * @throws IllegalArgumentException */ public static Writable getWritable(final byte [] bytes, final Writable w) throws IOException { if (bytes == null || bytes.length == 0) { - throw new IllegalArgumentException( - "Con't build a writable with empty bytes array"); + throw new IllegalArgumentException("Can't build a writable with empty " + + "bytes array"); } if (w == null) { throw new IllegalArgumentException("Writable cannot be null"); diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 8ed9fee41f3..27218d36185 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -32,6 +32,14 @@ import org.apache.hadoop.io.Text; * Abstract base class for test cases. Performs all static initialization */ public abstract class HBaseTestCase extends TestCase { + public final static String COLFAMILY_NAME1 = "colfamily1:"; + public final static String COLFAMILY_NAME2 = "colfamily2:"; + public final static String COLFAMILY_NAME3 = "colfamily3:"; + protected Path testDir = null; + protected FileSystem localFs = null; + public static final char FIRST_CHAR = 'a'; + public static final char LAST_CHAR = 'z'; + static { StaticTestEnvironment.initialize(); } @@ -47,6 +55,29 @@ public abstract class HBaseTestCase extends TestCase { super(name); conf = new HBaseConfiguration(); } + + @Override + protected void setUp() throws Exception { + super.setUp(); + this.testDir = getUnitTestdir(getName()); + this.localFs = FileSystem.getLocal(this.conf); + if (localFs.exists(testDir)) { + localFs.delete(testDir); + } + } + + @Override + protected void tearDown() throws Exception { + try { + if (this.localFs != null && this.testDir != null && + this.localFs.exists(testDir)) { + this.localFs.delete(testDir); + } + } catch (Exception e) { + e.printStackTrace(); + } + super.tearDown(); + } protected Path getUnitTestdir(String testName) { return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName); @@ -63,4 +94,112 @@ public abstract class HBaseTestCase extends TestCase { new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), fs, conf, info, null); } + + protected HTableDescriptor createTableDescriptor(final String name) { + HTableDescriptor htd = new HTableDescriptor(name); + htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1)); + htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2)); + htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3)); + return htd; + } + + protected void addContent(final HRegion r, final String column) + throws IOException { + Text startKey = r.getRegionInfo().getStartKey(); + Text endKey = r.getRegionInfo().getEndKey(); + byte [] startKeyBytes = startKey.getBytes(); + if (startKeyBytes == null || startKeyBytes.length == 0) { + startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + } + addContent(new HRegionLoader(r), column, startKeyBytes, endKey); + } + + protected void addContent(final Loader updater, final String column) + throws IOException { + addContent(updater, column, + new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null); + } + + protected void addContent(final Loader updater, final String column, + final byte [] startKeyBytes, final Text endKey) + throws IOException { + // Add rows of three characters. The first character starts with the + // 'a' character and runs up to 'z'. Per first character, we run the + // second character over same range. And same for the third so rows + // (and values) look like this: 'aaa', 'aab', 'aac', etc. + char secondCharStart = (char)startKeyBytes[1]; + char thirdCharStart = (char)startKeyBytes[2]; + EXIT: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) { + for (char d = secondCharStart; d <= LAST_CHAR; d++) { + for (char e = thirdCharStart; e <= LAST_CHAR; e++) { + byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e}; + Text t = new Text(new String(bytes)); + if (endKey != null && endKey.getLength() > 0 + && endKey.compareTo(t) <= 0) { + break EXIT; + } + long lockid = updater.startBatchUpdate(t); + try { + updater.put(lockid, new Text(column), bytes); + updater.commit(lockid); + lockid = -1; + } finally { + if (lockid != -1) { + updater.abort(lockid); + } + } + } + // Set start character back to FIRST_CHAR after we've done first loop. + thirdCharStart = FIRST_CHAR; + } + secondCharStart = FIRST_CHAR; + } + } + + public interface Loader { + public long startBatchUpdate(final Text row) throws IOException; + public void put(long lockid, Text column, byte val[]) throws IOException; + public void commit(long lockid) throws IOException; + public void abort(long lockid) throws IOException; + } + + public class HRegionLoader implements Loader { + final HRegion region; + public HRegionLoader(final HRegion HRegion) { + super(); + this.region = HRegion; + } + public void abort(long lockid) throws IOException { + this.region.abort(lockid); + } + public void commit(long lockid) throws IOException { + this.region.commit(lockid, System.currentTimeMillis()); + } + public void put(long lockid, Text column, byte[] val) throws IOException { + this.region.put(lockid, column, val); + } + public long startBatchUpdate(Text row) throws IOException { + return this.region.startUpdate(row); + } + } + + public class HTableLoader implements Loader { + final HTable table; + public HTableLoader(final HTable table) { + super(); + this.table = table; + } + public void abort(long lockid) throws IOException { + this.table.abort(lockid); + } + public void commit(long lockid) throws IOException { + this.table.commit(lockid); + } + public void put(long lockid, Text column, byte[] val) throws IOException { + this.table.put(lockid, column, val); + } + public long startBatchUpdate(Text row) { + return this.table.startBatchUpdate(row); + } + } } \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index 1e49bd6bb12..d1902f72ed9 100644 --- a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -365,7 +365,9 @@ public class MiniHBaseCluster implements HConstants { shutdown(this.masterThread, this.regionThreads); // Close the file system. Will complain if files open so helps w/ leaks. try { - this.cluster.getFileSystem().close(); + if (this.cluster != null && this.cluster.getFileSystem() != null) { + this.cluster.getFileSystem().close(); + } } catch (IOException e) { LOG.error("Closing down dfs", e); } diff --git a/src/test/org/apache/hadoop/hbase/TestCompaction.java b/src/test/org/apache/hadoop/hbase/TestCompaction.java new file mode 100644 index 00000000000..caa68ebbd9a --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestCompaction.java @@ -0,0 +1,101 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Test compactions + */ +public class TestCompaction extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(TestCompaction.class.getName()); + + protected void setUp() throws Exception { + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + /** + * Run compaction and flushing memcache + * @throws Exception + */ + public void testCompaction() throws Exception { + HLog hlog = new HLog(this.localFs, this.testDir, this.conf); + HTableDescriptor htd = createTableDescriptor(getName()); + HRegionInfo hri = new HRegionInfo(1, htd, null, null); + final HRegion r = + new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + try { + createStoreFile(r); + assertFalse(r.needsCompaction()); + int compactionThreshold = + this.conf.getInt("hbase.hstore.compactionThreshold", 3); + for (int i = 0; i < compactionThreshold; i++) { + createStoreFile(r); + } + assertTrue(r.needsCompaction()); + // Try to run compaction concurrent with a thread flush. + addContent(new HRegionLoader(r), COLFAMILY_NAME1); + Thread t1 = new Thread() { + @Override + public void run() { + try { + r.flushcache(false); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + Thread t2 = new Thread() { + @Override + public void run() { + try { + assertTrue(r.compactStores()); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + t1.setDaemon(true); + t1.start(); + t2.setDaemon(true); + t2.start(); + t1.join(); + t2.join(); + } finally { + r.close(); + hlog.closeAndDelete(); + } + } + + private void createStoreFile(final HRegion r) throws IOException { + HRegionLoader loader = new HRegionLoader(r); + for (int i = 0; i < 3; i++) { + addContent(loader, COLFAMILY_NAME1); + } + r.flushcache(false); + } +} \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/TestCompare.java b/src/test/org/apache/hadoop/hbase/TestCompare.java index 0f5bcbc0c3b..0bd3c1dfc2e 100644 --- a/src/test/org/apache/hadoop/hbase/TestCompare.java +++ b/src/test/org/apache/hadoop/hbase/TestCompare.java @@ -26,7 +26,27 @@ import junit.framework.TestCase; * Test comparing HBase objects. */ public class TestCompare extends TestCase { - /** test case */ + + /** + * HStoreKey sorts as you would expect in the row and column portions but + * for the timestamps, it sorts in reverse with the newest sorting before + * the oldest (This is intentional so we trip over the latest first when + * iterating or looking in store files). + */ + public void testHStoreKey() { + long timestamp = System.currentTimeMillis(); + Text a = new Text("a"); + HStoreKey past = new HStoreKey(a, a, timestamp - 10); + HStoreKey now = new HStoreKey(a, a, timestamp); + HStoreKey future = new HStoreKey(a, a, timestamp + 10); + assertTrue(past.compareTo(now) > 0); + assertTrue(now.compareTo(now) == 0); + assertTrue(future.compareTo(now) < 0); + } + + /** + * Sort of HRegionInfo. + */ public void testHRegionInfo() { HRegionInfo a = new HRegionInfo(1, new HTableDescriptor("a"), null, null); HRegionInfo b = new HRegionInfo(2, new HTableDescriptor("b"), null, null); diff --git a/src/test/org/apache/hadoop/hbase/TestSplit.java b/src/test/org/apache/hadoop/hbase/TestSplit.java index aa172d90b03..7f5819e237c 100644 --- a/src/test/org/apache/hadoop/hbase/TestSplit.java +++ b/src/test/org/apache/hadoop/hbase/TestSplit.java @@ -38,61 +38,35 @@ import org.apache.log4j.Logger; * split and manufactures odd-ball split scenarios. */ public class TestSplit extends HBaseTestCase { - static final Log LOG = LogFactory.getLog(TestSplit.class); - private final static String COLFAMILY_NAME1 = "colfamily1:"; - private final static String COLFAMILY_NAME2 = "colfamily2:"; - private final static String COLFAMILY_NAME3 = "colfamily3:"; - private Path testDir = null; - private FileSystem fs = null; - private static final char FIRST_CHAR = 'a'; - private static final char LAST_CHAR = 'z'; - + static final Log LOG = LogFactory.getLog(TestSplit.class.getName()); + /** constructor */ public TestSplit() { Logger.getRootLogger().setLevel(Level.WARN); - Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); + Logger.getLogger(this.getClass().getPackage().getName()). + setLevel(Level.DEBUG); } /** {@inheritDoc} */ @Override public void setUp() throws Exception { super.setUp(); - this.testDir = getUnitTestdir(getName()); - this.fs = FileSystem.getLocal(this.conf); - if (fs.exists(testDir)) { - fs.delete(testDir); - } // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M conf.setLong("hbase.hregion.max.filesize", 1024 * 128); } - /** {@inheritDoc} */ - @Override - public void tearDown() throws Exception { - if (fs != null) { - try { - if (this.fs.exists(testDir)) { - this.fs.delete(testDir); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - super.tearDown(); - } - /** * Splits twice and verifies getting from each of the split regions. * @throws Exception */ public void testBasicSplit() throws Exception { HRegion region = null; - HLog hlog = new HLog(this.fs, this.testDir, this.conf); + HLog hlog = new HLog(this.localFs, this.testDir, this.conf); try { HTableDescriptor htd = createTableDescriptor(getName()); HRegionInfo hri = new HRegionInfo(1, htd, null, null); - region = new HRegion(testDir, hlog, fs, this.conf, hri, null); + region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); basicSplit(region); } finally { if (region != null) { @@ -102,14 +76,6 @@ public class TestSplit extends HBaseTestCase { } } - private HTableDescriptor createTableDescriptor(final String name) { - HTableDescriptor htd = new HTableDescriptor(name); - htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1)); - htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2)); - htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3)); - return htd; - } - private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); region.internalFlushcache(); @@ -184,13 +150,13 @@ public class TestSplit extends HBaseTestCase { * @throws Exception */ public void testSplitRegionIsDeleted() throws Exception { - final int retries = 10; - this.testDir = null; - this.fs = null; + final int retries = 10; // Start up a hbase cluster - MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1); - Path testDir = cluster.regionThreads.get(0).getRegionServer().rootDir; - FileSystem fs = cluster.getDFSCluster().getFileSystem(); + MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1, true); + Path d = cluster.regionThreads.get(0).getRegionServer().rootDir; + FileSystem fs = (cluster.getDFSCluster() == null)? + this.localFs: + cluster.getDFSCluster().getFileSystem(); HTable meta = null; HTable t = null; try { @@ -201,7 +167,7 @@ public class TestSplit extends HBaseTestCase { meta = new HTable(this.conf, HConstants.META_TABLE_NAME); int count = count(meta, HConstants.COLUMN_FAMILY_STR); t = new HTable(this.conf, new Text(getName())); - addContent(t, COLFAMILY_NAME3); + addContent(new HTableLoader(t), COLFAMILY_NAME3); // All is running in the one JVM so I should be able to get the // region instance and bring on a split. HRegionInfo hri = @@ -223,8 +189,7 @@ public class TestSplit extends HBaseTestCase { } HRegionInfo parent = getSplitParent(meta); assertTrue(parent.isOffline()); - Path parentDir = - HRegion.getRegionDir(testDir, parent.getRegionName()); + Path parentDir = HRegion.getRegionDir(d, parent.getRegionName()); assertTrue(fs.exists(parentDir)); LOG.info("Split happened and parent " + parent.getRegionName() + " is " + "offline"); @@ -263,7 +228,7 @@ public class TestSplit extends HBaseTestCase { for (int i = 0; i < 10; i++) { try { for (HRegion online: regions.values()) { - if (online.getRegionName().toString().startsWith(getName())) { + if (online.getTableDesc().getName().toString().equals(getName())) { online.compactStores(); } } @@ -403,79 +368,4 @@ public class TestSplit extends HBaseTestCase { assertEquals(regions.length, 2); return regions; } - - private void addContent(final HRegion r, final String column) - throws IOException { - Text startKey = r.getRegionInfo().getStartKey(); - Text endKey = r.getRegionInfo().getEndKey(); - byte [] startKeyBytes = startKey.getBytes(); - if (startKeyBytes == null || startKeyBytes.length == 0) { - startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; - } - // Add rows of three characters. The first character starts with the - // 'a' character and runs up to 'z'. Per first character, we run the - // second character over same range. And same for the third so rows - // (and values) look like this: 'aaa', 'aab', 'aac', etc. - char secondCharStart = (char)startKeyBytes[1]; - char thirdCharStart = (char)startKeyBytes[2]; - EXIT_ALL_LOOPS: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) { - for (char d = secondCharStart; d <= LAST_CHAR; d++) { - for (char e = thirdCharStart; e <= LAST_CHAR; e++) { - byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e}; - Text t = new Text(new String(bytes)); - if (endKey != null && endKey.getLength() > 0 - && endKey.compareTo(t) <= 0) { - break EXIT_ALL_LOOPS; - } - long lockid = r.startUpdate(t); - try { - r.put(lockid, new Text(column), bytes); - r.commit(lockid, System.currentTimeMillis()); - lockid = -1; - } finally { - if (lockid != -1) { - r.abort(lockid); - } - } - } - // Set start character back to FIRST_CHAR after we've done first loop. - thirdCharStart = FIRST_CHAR; - } - secondCharStart = FIRST_CHAR; - } - } - - // TODO: Have HTable and HRegion implement interface that has in it - // startUpdate, put, delete, commit, abort, etc. - private void addContent(final HTable table, final String column) - throws IOException { - byte [] startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; - // Add rows of three characters. The first character starts with the - // 'a' character and runs up to 'z'. Per first character, we run the - // second character over same range. And same for the third so rows - // (and values) look like this: 'aaa', 'aab', 'aac', etc. - char secondCharStart = (char)startKeyBytes[1]; - char thirdCharStart = (char)startKeyBytes[2]; - for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) { - for (char d = secondCharStart; d <= LAST_CHAR; d++) { - for (char e = thirdCharStart; e <= LAST_CHAR; e++) { - byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e}; - Text t = new Text(new String(bytes)); - long lockid = table.startUpdate(t); - try { - table.put(lockid, new Text(column), bytes); - table.commit(lockid, System.currentTimeMillis()); - lockid = -1; - } finally { - if (lockid != -1) { - table.abort(lockid); - } - } - } - // Set start character back to FIRST_CHAR after we've done first loop. - thirdCharStart = FIRST_CHAR; - } - secondCharStart = FIRST_CHAR; - } - } -} \ No newline at end of file +}